Apache Flink Job fails repeatedly due to RemoteTransportException

2020-01-28 Thread M Singh
Hi Folks:
We have streaming Flink application (using v 1.6.2) and it dies within 12 
hours.  We have configured number of restarts which is 10 at the moment.
Sometimes the job runs for some time and then within a very short time has a 
number of restarts and finally fails.  In other instances, the restarts happen 
randomly. So there is no pattern that I could discern for the restarts.
I can increase the restart count but would like to see if there is any advice 
on the root cause of this issue.  I've seen a some emails in the user groups 
but could not find any definitive solution or investigation steps.

Is there any any on how to investigate it further or resolve it ?
The exception we see in the job manager is:
2020-01-29 06:15:42,371 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Job testJob 
(d65a52389f9ea30def1fe522bf3956c6) switched from state FAILING to FAILED.
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
Connection unexpectedly closed by remote task manager 
'ip-xx-xxx-xxx-xxx.ec2.internal/xx.xxx.xxx.xxx:39623'. This might indicate that 
the remote task manager was lost.
at 
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:136)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)
at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:377)
at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:342)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)
at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1429)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:947)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:822)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)
at java.lang.Thread.run(Thread.java:748)
2020-01-29 06:15:42,371 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Could not 
restart the job testJob (d65a52389f9ea30def1fe522bf3956c6) because the restart 
strategy prevented it.


Flink+YARN HDFS replication factor

2020-01-28 Thread Piper Piper
Hello,

When using Flink+YARN (with HDFS) and having a long running Flink session
(mode) cluster with a Flink client submitting jobs, the HDFS could have a
replication factor greater than 1 (example 3).

So, I would like to know when and how any of the data (like event-data or
batch-data) or code (like JAR) in a Flink job is saved to the HDFS and is
replicated in the entire YARN cluster of nodes?

For example, in streaming applications, would all the event-data only be in
memory (RAM) until it reaches the DAG's sink and then must be saved into
HDFS?

Thank you,

Piper


Re: REST rescale with Flink on YARN

2020-01-28 Thread Yang Wang
Gary is right. You could also access the Yarn RM rest api to get the AM
original address.

http://hadoop.apache.org/docs/r2.8.5/hadoop-yarn/hadoop-yarn-site/ResourceManagerRest.html#Cluster_Application_API


Best,
Yang

Gary Yao  于2020年1月28日周二 下午6:17写道:

> Hi,
>
> You can use
>
> yarn application -status 
>
> to find the host and port that the server is listening on (AM host & RPC
> Port). If you need to access that information programmatically, take a
> look at
> the YarnClient [1].
>
> Best,
> Gary
>
>
> [1]
> https://hadoop.apache.org/docs/r2.8.5/api/org/apache/hadoop/yarn/client/api/YarnClient.html
>
> On Thu, Jan 23, 2020 at 3:21 PM Vasily Melnik <
> vasily.mel...@glowbyteconsulting.com> wrote:
>
>> Hi all,
>> I've found some solution for this issue.
>> Problem is that with YARN ApplicationMaster URL we communicate with
>> JobManager via proxy which is implemented on Jetty 6 (for Hadoop 2.6).
>> So to use PATCH method we need to locate original JobManager URL.
>> Using /jobmanager/config API we could get only host, but web.port is
>> displayed as 0 (???)
>> To find actual web port, we should parse YARN logs for jobmanager, where
>> we can find something like this:
>>
>> *INFO org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint -
>> Rest endpoint listening at :.*
>>
>> Maybe  someone knows less complicated way to find actual REST URL under
>> YARN?
>>
>>
>>
>>
>> С уважением,
>> Василий Мельник
>>
>>
>> On Thu, 23 Jan 2020 at 15:32, Chesnay Schepler 
>> wrote:
>>
>>> Older versions of Jetty don't support PATCH requests. You will either
>>> have to update it or create a custom Flink version that uses POST for the
>>> rescale operation.
>>>
>>> On 23/01/2020 13:23, Vasily Melnik wrote:
>>>
>>> Hi all.
>>> I'm using Flink 1.8 on YARN with CDH 5.12
>>> When i try to perform rescale request:
>>>
>>> curl -v -X PATCH 
>>> '/proxy/application_1576854986116_0079/jobs/11dcfc3163936fc019e049fc841b075b/rescaling?parallelism=3
>>>  
>>> '
>>>
>>> i get a mistake:
>>>
>>> *Method PATCH is not defined in RFC 2068 and is not supported by the
>>> Servlet API *GET and POST methods work well.
>>> The Server type in response is Jetty(6.1.26.cloudera.4).
>>>
>>> How can i deal with this situation?
>>>
>>> С уважением,
>>> Василий Мельник
>>>
>>>
>>>


Re: is streaming outer join sending unnecessary traffic?

2020-01-28 Thread kant kodali
Sorry. fixed some typos.

I am doing a streaming outer join from four topics in Kafka lets call them
sample1, sample2, sample3, sample4. Each of these test topics has just one
column which is of tuple string. my query is this

SELECT * FROM sample1 FULL OUTER JOIN sample2 on sample1.f0=sample2.f0
FULL OUTER JOIN sample3 on sample2.f0=sample3.f0 FULL OUTER JOIN
sample4 on sample3.f0=sample4.f0


And here is how I send messages to those Kafka topics at various times.

At time t1 Send a message "flink" to test-topic1

(true,flink,null,null,null) // Looks good

At time t2 Send a message "flink" to test-topic4

(true,null,null,null,flink) // Looks good

At time t3 Send a message "flink" to test-topic3

(false,null,null,null,flink) // Looks good
(true,null,null,flink,flink) //Looks good

At time t4 Send a message "flink" to test-topic2

(false,flink,null,null,null) // Looks good
(false,null,null,flink,flink) // Looks good
*(true,null,null,null,flink) // Redundant?*
*(false,null,null,null,flink) // Redundant?*
(true,flink,flink,flink,flink) //Looks good

Assume t1 wrote:

> Hi All,
>
> I am doing a streaming outer join from four topics in Kafka lets call them
> sample1, sample2, sample3, sample4. Each of these test topics has just one
> column which is of tuple string. my query is this
>
> SELECT * FROM sample1 FULL OUTER JOIN sample2 on sample1.f0=sample2.f0 FULL 
> OUTER JOIN sample3 on sample2.f0=sample3.f0 FULL OUTER JOIN sample4 on 
> sample3.f0=sample4.f0
>
>
> And here is how I send messages to those Kafka topics at various times.
>
> At time t1 Send a message "flink" to test-topic1
>
> (true,flink,null,null,null) // Looks good
>
> At time t2 Send a message "flink" to test-topic4
>
> (true,null,null,null,flink) // Looks good
>
> At time t3 Send a message "flink" to test-topic3
>
> (false,null,null,null,flink) // Looks good
> (true,null,null,flink,flink) //Looks good
>
> At time t3 Send a message "flink" to test-topic2
>
> (false,flink,null,null,null) // Looks good
> (false,null,null,flink,flink) // Looks good
> *(true,null,null,null,flink) // Redundant?*
> *(false,null,null,null,flink) // Redundant?*
> (true,flink,flink,flink,flink) //Looks good
>
> Those two rows above seem to be redundant to be although the end result is
> correct. Doesn't see the same behavior if I join two topics. This unwanted
> message will lead to a lot of database operations underneath so any way to
> optimize this? I am using Flink 1.9 so not sure if this is already fixed in
> 1.10.
>
> Attached the code as well.
>
> Thanks!
> kant
>
>
>
>


is streaming outer join sending unnecessary traffic?

2020-01-28 Thread kant kodali
Hi All,

I am doing a streaming outer join from four topics in Kafka lets call them
sample1, sample2, sample3, sample4. Each of these test topics has just one
column which is of tuple string. my query is this

SELECT * FROM sample1 FULL OUTER JOIN sample2 on sample1.f0=sample2.f0
FULL OUTER JOIN sample3 on sample2.f0=sample3.f0 FULL OUTER JOIN
sample4 on sample3.f0=sample4.f0


And here is how I send messages to those Kafka topics at various times.

At time t1 Send a message "flink" to test-topic1

(true,flink,null,null,null) // Looks good

At time t2 Send a message "flink" to test-topic4

(true,null,null,null,flink) // Looks good

At time t3 Send a message "flink" to test-topic3

(false,null,null,null,flink) // Looks good
(true,null,null,flink,flink) //Looks good

At time t3 Send a message "flink" to test-topic2

(false,flink,null,null,null) // Looks good
(false,null,null,flink,flink) // Looks good
*(true,null,null,null,flink) // Redundant?*
*(false,null,null,null,flink) // Redundant?*
(true,flink,flink,flink,flink) //Looks good

Those two rows above seem to be redundant to be although the end result is
correct. Doesn't see the same behavior if I join two topics. This unwanted
message will lead to a lot of database operations underneath so any way to
optimize this? I am using Flink 1.9 so not sure if this is already fixed in
1.10.

Attached the code as well.

Thanks!
kant


Test.java
Description: Binary data


Does flink support retries on checkpoint write failures

2020-01-28 Thread Richard Deurwaarder
Hi all,

We've got a Flink job running on 1.8.0 which writes its state (rocksdb) to
Google Cloud Storage[1]. We've noticed that jobs with a large amount of
state (500gb range) are becoming *very* unstable. In the order of
restarting once an hour or even more.

The reason for this instability is that we run into "410 Gone"[4] errors
from Google Cloud Storage. This indicates an upload (write from Flink's
perspective) took place and it wanted to resume the write[2] but could not
find the file which it needed to resume. My guess is this is because the
previous attempt either failed or perhaps it uploads in chunks of 67mb [3].

The library logs this line when this happens:

"Encountered status code 410 when accessing URL
https://www.googleapis.com/upload/storage/v1/b//o?ifGenerationMatch=0&name=job-manager/15aa2391-a055-4bfd-8d82-e9e4806baa9c/8ae818761055cdc022822010a8b4a1ed/chk-52224/_metadata&uploadType=resumable&upload_id=AEnB2UqJwkdrQ8YuzqrTp9Nk4bDnzbuJcTlD5E5hKNLNz4xQ7vjlYrDzYC29ImHcp0o6OjSCmQo6xkDSj5OHly7aChH0JxxXcg.
Delegating to response handler for possible retry."

We're kind of stuck on these questions:
* Is flink capable or doing these retries?
* Does anyone succesfully write their (rocksdb) state to Google Cloud
storage for bigger state sizes?
* Is it possible flink renames or deletes certain directories before all
flushes have been done based on an atomic guarantee provided by HDFS that
does not hold on other implementations perhaps? A race condition of sorts

Basically does anyone recognize this behavior?

Regards,

Richard Deurwaarder

[1] We use an HDFS implementation provided by Google
https://github.com/GoogleCloudDataproc/bigdata-interop/tree/master/gcs
[2] https://cloud.google.com/storage/docs/json_api/v1/status-codes#410_Gone
[3]
https://github.com/GoogleCloudDataproc/bigdata-interop/blob/master/gcs/CONFIGURATION.md
(see
fs.gs.outputstream.upload.chunk.size)
[4] Stacktrace:
https://gist.github.com/Xeli/da4c0af2c49c060139ad01945488e492


Flink distribution housekeeping for YARN sessions

2020-01-28 Thread Theo Diefenthal
Hi there, 

Today I realized that we currently have a lot of not housekept flink 
distribution jar files and would like to know what to do about this, i.e. how 
to proper housekeep them. 

In the job submitting HDFS home directory, I find a subdirectory called 
`.flink` with hundreds of subfolders like `application_1573731655031_0420`, 
having the following structure: 

-rw-r--r-- 3 dev dev 861 2020-01-27 21:17 
/user/dev/.flink/application_1580155950981_0010/4797ff6e-853b-460c-81b3-34078814c5c9-taskmanager-conf.yaml
 
-rw-r--r-- 3 dev dev 691 2020-01-27 21:16 
/user/dev/.flink/application_1580155950981_0010/application_1580155950981_0010-flink-conf.yaml2755466919863419496.tmp
 
-rw-r--r-- 3 dev dev 861 2020-01-27 21:17 
/user/dev/.flink/application_1580155950981_0010/fdb5ef57-c140-4f6d-9791-c226eb1438ce-taskmanager-conf.yaml
 
-rw-r--r-- 3 dev dev 92.2 M 2020-01-27 21:16 
/user/dev/.flink/application_1580155950981_0010/flink-dist_2.11-1.9.1.jar 
drwxr-xr-x - dev dev 0 2020-01-27 21:16 
/user/dev/.flink/application_1580155950981_0010/lib 
-rw-r--r-- 3 dev dev 2.6 K 2020-01-27 21:16 
/user/dev/.flink/application_1580155950981_0010/log4j.properties 
-rw-r--r-- 3 dev dev 2.3 K 2020-01-27 21:16 
/user/dev/.flink/application_1580155950981_0010/logback.xml 
drwxr-xr-x - dev dev 0 2020-01-27 21:16 
/user/dev/.flink/application_1580155950981_0010/plugins 

With having tons of those folders (For each flink session we launched/killed in 
our CI CD pipeline), they sum up to some terrabytes in our HDFS in used space. 
I suppose, I kill our flink sessions wrongly. We start and stop sessions and 
and jobs separately like so: 

Start: 
$ {OS_ROOT} /flink/bin/yarn-session.sh -jm 4g -tm 32g --name " $ 
{FLINK_SESSION_NAME} " -d -Denv.java.opts= " -XX:+HeapDumpOnOutOfMemoryError " 
$ {OS_ROOT}/flink/bin/flink run -m $ {FLINK_HOST} [..savepoint/checkpoint 
options...] -d -n " $ {JOB_JAR} " $* 
Stop 
$ {OS_ROOT} /flink/bin/flink stop -p $ {SAVEPOINT_BASEDIR}/ $ {FLINK_JOB_NAME} 
-m $ {FLINK_HOST} $ {ID} 
yarn application -kill " $ {ID} " 

yarn application -kill was the best I could find as the flink docu states, the 
linux session process should only be closed (" Stop the YARN session by 
stopping the unix process (using CTRL+C) or by entering ‘stop’ into the client. 
"). 

Now my question: Is there a more elegant way to kill a yarn session (remotely 
from some host in the cluster, not necessarily the one starting the detached 
session), which also does the housekeeping then? Or should I do the 
housekeeping myself manually? (Pretty easy to script). Do I need to expect any 
more side effects when killing the session with "yarn application -kill"? 

Best regards 
Theo 

-- 
SCOOP Software GmbH - Gut Maarhausen - Eiler Straße 3 P - D-51107 Köln 
Theo Diefenthal 

T +49 221 801916-196 - F +49 221 801916-17 - M +49 160 90506575 
theo.diefent...@scoop-software.de - www.scoop-software.de 
Sitz der Gesellschaft: Köln, Handelsregister: Köln, 
Handelsregisternummer: HRB 36625 
Geschäftsführung: Dr. Oleg Balovnev, Frank Heinen, 
Martin Müller-Rohde, Dr. Wolfgang Reddig, Roland Scheel 


Reinterpreting a pre-partitioned data stream as keyed stream

2020-01-28 Thread KristoffSC
Hi all,
we have a use case where order of received events matters and it should be
kept across pipeline.

Our pipeline would be paralleled. We can key the stream just after Source
operator, but in order to keep the ordering among next operators we would
have to still keep the stream keyed. 

Obviously we could key again and again but this would cause some performance
penalty.
We were thinking about using DataStreamUtils.reinterpretAsKeyedStream
instead.

Since this is an experimental functionality I would like to ask if there is
someone among the community that is using this feature? Do we know about any
open issues regarding this feature?

Thanks,
Krzysztof








--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink RocksDB logs filling up disk space

2020-01-28 Thread Ahmad Hassan
Hello Yun,

With no checkpointing it is even a bigger problem because if we rely on
flink auto commit then if it fails to commit once due to any outage or
kafka rebalancing then it never retries again and it means full outage on
live systems.

For sure we need checkpointing for other reasons too i.e. high availability
and state recovery.

Best,

On Tue, 28 Jan 2020 at 14:22, Yun Tang  wrote:

> Hi Ahmad
>
> We mainly recommend our user to set the checkpoint interval as three
> minutes.
> If you don't rely on the keyed state to persistence, you could also
> disable checkpoint and let the kafka client to commit offset automatically
> [1] which might the most light-weight solution.
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration
>
> Best
> Yun Tang
> --
> *From:* Ahmad Hassan 
> *Sent:* Tuesday, January 28, 2020 17:43
> *To:* user 
> *Subject:* Re: Flink RocksDB logs filling up disk space
>
> Hi Yun,
>
> Thank you for pointing that out. In our production landscapes with live
> customers, we have 10 second checkpoint interval and 7MB of average
> checkpoint size. We do incremental checkpoints. If we keep the checkpoint
> interval longer (i.e. 1 minute) then the kafka consumer lag starts
> increasing. The reason is that over the period of 1 minute, the checkpoint
> size grows and the job takes long time to do the checkpoint and as a result
> kafka consumer lag for our live traffic goes high. In order to keep
> checkpoint size small, we tried 10 second option which is working out well
> and our kafka lag never exceeds beyond 20 messages on average. But i agree
> with you that 10 second option does not feel right and is too frequent in
> my opinion.
>
> Do you have any recommendations for checkpointing interval please ?
>
> Best Regards,
>
>
> On Tue, 28 Jan 2020 at 07:46, Yun Tang  wrote:
>
> Hi Ahmad
>
> Apart from setting the logger level of RocksDB, I also wonder why you
> would meet rocksdb checkpoint IO logs were filling up disk space very very
> quickly. How larger the local checkpoint state is and how long the
> checkpoint interval is? I think you might give a too short interval of
> checkpoint, even you could avoid to record too many logs, and I don't think
> current checkpoint configuration is appropriate.
>
> Best
> Yun Tang
> --
> *From:* Ahmad Hassan 
> *Sent:* Monday, January 27, 2020 20:22
> *To:* user 
> *Subject:* Re: Flink RocksDB logs filling up disk space
>
>
> Thanks Chesnay!
>
> On Mon, 27 Jan 2020 at 11:29, Chesnay Schepler  wrote:
>
> Please see https://issues.apache.org/jira/browse/FLINK-15068
>
> On 27/01/2020 12:22, Ahmad Hassan wrote:
>
> Hello,
>
> In our production systems, we see that flink rocksdb checkpoint IO logs
> are filling up disk space very very quickly in the order of GB's as the
> logging is very verbose. How do we disable or suppress these logs please ?
> The rocksdb file checkpoint.cc is dumping huge amount of checkpoint logs
> like
>
> Log(db_options.info_log, "Hard Linking %s", src_fname.c_str());
>
>
> Best Regards,
>
>
>


Re: Flink RocksDB logs filling up disk space

2020-01-28 Thread Yun Tang
Hi Ahmad

We mainly recommend our user to set the checkpoint interval as three minutes.
If you don't rely on the keyed state to persistence, you could also disable 
checkpoint and let the kafka client to commit offset automatically [1] which 
might the most light-weight solution.


[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration

Best
Yun Tang

From: Ahmad Hassan 
Sent: Tuesday, January 28, 2020 17:43
To: user 
Subject: Re: Flink RocksDB logs filling up disk space

Hi Yun,

Thank you for pointing that out. In our production landscapes with live 
customers, we have 10 second checkpoint interval and 7MB of average checkpoint 
size. We do incremental checkpoints. If we keep the checkpoint interval longer 
(i.e. 1 minute) then the kafka consumer lag starts increasing. The reason is 
that over the period of 1 minute, the checkpoint size grows and the job takes 
long time to do the checkpoint and as a result kafka consumer lag for our live 
traffic goes high. In order to keep checkpoint size small, we tried 10 second 
option which is working out well and our kafka lag never exceeds beyond 20 
messages on average. But i agree with you that 10 second option does not feel 
right and is too frequent in my opinion.

Do you have any recommendations for checkpointing interval please ?

Best Regards,


On Tue, 28 Jan 2020 at 07:46, Yun Tang 
mailto:myas...@live.com>> wrote:
Hi Ahmad

Apart from setting the logger level of RocksDB, I also wonder why you would 
meet rocksdb checkpoint IO logs were filling up disk space very very quickly. 
How larger the local checkpoint state is and how long the checkpoint interval 
is? I think you might give a too short interval of checkpoint, even you could 
avoid to record too many logs, and I don't think current checkpoint 
configuration is appropriate.

Best
Yun Tang

From: Ahmad Hassan mailto:ahmad.has...@gmail.com>>
Sent: Monday, January 27, 2020 20:22
To: user mailto:user@flink.apache.org>>
Subject: Re: Flink RocksDB logs filling up disk space


Thanks Chesnay!

On Mon, 27 Jan 2020 at 11:29, Chesnay Schepler 
mailto:ches...@apache.org>> wrote:
Please see https://issues.apache.org/jira/browse/FLINK-15068

On 27/01/2020 12:22, Ahmad Hassan wrote:
Hello,

In our production systems, we see that flink rocksdb checkpoint IO logs are 
filling up disk space very very quickly in the order of GB's as the logging is 
very verbose. How do we disable or suppress these logs please ? The rocksdb 
file checkpoint.cc is dumping huge amount of checkpoint logs like

Log(db_options.info_log, "Hard Linking %s", src_fname.c_str());


Best Regards,




Re: Flink ParquetAvroWriters Sink

2020-01-28 Thread aj
I am able to resolve this issue by setting classloader.resolve-order as
parent-first.

On Wed, Jan 22, 2020, 23:13 aj  wrote:

> Hi Arvid,
>
> I have implemented the code with envelope schema as you suggested but now
> I am facing issues with the consumer . I have written code like this:
>
> FlinkKafkaConsumer010 kafkaConsumer010 = new
> FlinkKafkaConsumer010(KAFKA_TOPICS,
> new
> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
> properties);
>
> And the Deserialization class looks like this :
>
> pblic class KafkaGenericAvroDeserializationSchema implements
> KeyedDeserializationSchema {
>
> private final String registryUrl;
> private transient KafkaAvroDeserializer inner;
>
> public KafkaGenericAvroDeserializationSchema(String registryUrl) {
> this.registryUrl = registryUrl;
> }
>
> @Override
> public GenericRecord deserialize(byte[] messageKey, byte[] message,
> String topic, int partition, long offset) {
> checkInitialized();
> return (GenericRecord) inner.deserialize(topic, message);
> }
>
> @Override
> public boolean isEndOfStream(GenericRecord nextElement) {
> return false;
> }
>
> @Override
> public TypeInformation getProducedType() {
> return TypeExtractor.getForClass(GenericRecord.class);
> }
>
> private void checkInitialized() {
> if (inner == null) {
> Map props = new HashMap<>();
>
> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
> registryUrl);
>
> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
> SchemaRegistryClient client =
> new CachedSchemaRegistryClient(
> registryUrl,
> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
> inner = new KafkaAvroDeserializer(client, props);
> }
> }
> }
>
>
> It's working locally on my machine but when I deployed it on yarn cluster
> I am getting below exception:
>
>
> java.lang.Exception: org.apache.flink.streaming.runtime.tasks.
> ExceptionInChainedOperatorException: Could not forward element to next
> operator
> at org.apache.flink.streaming.runtime.tasks.
> SourceStreamTask$LegacySourceFunctionThread
> .checkThrowSourceExecutionException(SourceStreamTask.java:212)
> at org.apache.flink.streaming.runtime.tasks.SourceStreamTask
> .performDefaultAction(SourceStreamTask.java:132)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask
> .java:298)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:403)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.streaming.runtime.tasks.
> ExceptionInChainedOperatorException: Could not forward element to next
> operator
> at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
> at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
> at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
> at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
> 727)
> at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
> 705)
> at org.apache.flink.streaming.api.operators.
> StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts
> .java:104)
> at org.apache.flink.streaming.api.operators.
> StreamSourceContexts$NonTimestampContext.collectWithTimestamp(
> StreamSourceContexts.java:111)
> at org.apache.flink.streaming.connectors.kafka.internals.
> AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
> at org.apache.flink.streaming.connectors.kafka.internal.
> Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:91)
> at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher
> .runFetchLoop(Kafka09Fetcher.java:156)
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase
> .run(FlinkKafkaConsumerBase.java:715)
> at org.apache.flink.streaming.api.operators.StreamSource.run(
> StreamSource.java:100)
> at org.apache.flink.streaming.api.operators.StreamSource.run(
> StreamSource.java:63)
> at org.apache.flink.streaming.runtime.tasks.
> SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202)
> Caused by: com.esotericsoftware.kryo.KryoException: Error constructing
> instance of class: org.apache.avro.Schema$LockableArrayList
> Serialization trace:
> types (org.apache.avro.Schema$UnionSchema)
> schema (org.apache.avro.Schema$Field)
> fieldMap (org.apache.avro.Schema$RecordSchema)
>

Re: where does flink store the intermediate results of a join and what is the key?

2020-01-28 Thread Arvid Heise
Yes, the default is writing to an external system. Especially if you want
SQL, then there is currently no other way around it.

The drawbacks of writing to external systems are: additional maintenance of
another system and higher latency.

On Tue, Jan 28, 2020 at 11:49 AM kant kodali  wrote:

> Hi Arvid,
>
> I am trying to understand your statement. I am new to Flink so excuse me
> if I don't know something I should have known. ProcessFunction just process
> the records right? If so, how is it better than writing to an external
> system? At the end of the day I want to be able to query it (doesn't have
> to be through Queryable state and actually I probably don't want to use
> Queryable state for its limitations). But ideally I want to be able to
> query the intermediate states using SQL and hopefully, the store that is
> maintaining the intermediate state has some sort of index support so the
> read queries are faster than doing the full scan.
>
> Also, I hear Querying intermediate state just like one would in a database
> is a widely requested feature so its a bit surprising that this is not
> solved just yet but I am hopeful!
>
> Thanks!
>
>
>
> On Tue, Jan 28, 2020 at 12:45 AM Arvid Heise  wrote:
>
>> Hi Kant,
>>
>> just wanted to mention the obvious. If you add a ProcessFunction right
>> after the join, you could maintain a user state with the same result. That
>> will of course blow up the data volume by a factor of 2, but may still be
>> better than writing to an external system.
>>
>> On Mon, Jan 27, 2020 at 6:09 PM Benoît Paris <
>> benoit.pa...@centraliens-lille.org> wrote:
>>
>>> Dang what a massive PR: Files changed2,118,  +104,104 −29,161 lines
>>> changed.
>>> Thanks for the details, Jark!
>>>
>>> On Mon, Jan 27, 2020 at 4:07 PM Jark Wu  wrote:
>>>
 Hi Kant,
 Having a custom state backend is very difficult and is not recommended.

 Hi Benoît,
 Yes, the "Query on the intermediate state is on the roadmap" I
 mentioned is referring to integrate Table API & SQL with Queryable State.
 We also have an early issue FLINK-6968 to tracks this.

 Best,
 Jark


 On Fri, 24 Jan 2020 at 00:26, Benoît Paris <
 benoit.pa...@centraliens-lille.org> wrote:

> Hi all!
>
> @Jark, out of curiosity, would you be so kind as to expand a bit on "Query
> on the intermediate state is on the roadmap"?
> Are you referring to working on
> QueryableStateStream/QueryableStateClient [1], or around "FOR SYSTEM_TIME
> AS OF" [2], or on other APIs/concepts (is there a FLIP?)?
>
> Cheers
> Ben
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#querying-state
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/temporal_tables.html#temporal-table
>
>
> On Thu, Jan 23, 2020 at 6:40 AM kant kodali 
> wrote:
>
>> Is it a common practice to have a custom state backend? if so, what
>> would be a popular custom backend?
>>
>> Can I do Elasticseatch as a state backend?
>>
>> Thanks!
>>
>> On Wed, Jan 22, 2020 at 1:42 AM Jark Wu  wrote:
>>
>>> Hi Kant,
>>>
>>> 1) List of row is also sufficient in this case. Using a MapState is
>>> in order to retract a row faster, and save the storage size.
>>>
>>> 2) State Process API is usually used to process save point. I’m
>>> afraid the performance is not good to use it for querying.
>>> On the other side, AFAIK, State Process API requires the uid of
>>> operator. However, uid of operators is not set in Table API & SQL.
>>> So I’m not sure whether it works or not.
>>>
>>> 3)You can have a custom statebackend by
>>> implement org.apache.flink.runtime.state.StateBackend interface, and 
>>> use it
>>> via `env.setStateBackend(…)`.
>>>
>>> Best,
>>> Jark
>>>
>>> On Wed, 22 Jan 2020 at 14:16, kant kodali 
>>> wrote:
>>>
 Hi Jark,

 1) shouldn't it be a col1 to List of row? multiple rows can have
 the same joining key right?

 2) Can I use state processor API
 
 from an external application to query the intermediate results in near
 real-time? I thought querying rocksdb state is a widely requested 
 feature.
 It would be really great to consider this feature for 1.11

 3) Is there any interface where I can implement my own state
 backend?

 Thanks!


 On Tue, Jan 21, 2020 at 6:08 PM Jark Wu  wrote:

> Hi Kant,
>
> 1) Yes, it will be stored in rocksdb statebackend.
> 2) In old planner, the left state is the same with right state
> which are both `>>`.
> It is a 2-level map stru

Re: Flink and Presto integration

2020-01-28 Thread Flavio Pompermaier
Hive metastore is the de facto standard for Hadoop but in my use case I
have to query other databases (like MySQL, Oracle and SQL Server).
So Presto would be a good choice (apart from the fact that you need to
restart it when you add a new catalog..), and I'd like to have an easy
translation of the catalogs..
Another fear I have is that I could have different versions of the same
database type (e.g. Oracle or SQL server) and I'll probably hit an
incompatibility when using the latest jar of a connector.
>From what I see this corner case doesn't have a clear solution but I have
some workaround in mind that I need to verify (e.g. shade jars or allocate
source reader tasks to different Task Managers based on the deployed jar
versions..)

On Tue, Jan 28, 2020 at 11:05 AM Piotr Nowojski  wrote:

> Hi,
>
> Yes, Presto (in presto-hive connector) is just using hive Metastore to get
> the table definitions/meta data. If you connect to the same hive Metastore
> with Flink, both systems should be able to see the same tables.
>
> Piotrek
>
> On 28 Jan 2020, at 04:34, Jingsong Li  wrote:
>
> Hi Flavio,
>
> Your requirement should be to use blink batch to read the tables in Presto?
> I'm not familiar with Presto's catalog. Is it like hive Metastore?
>
> If so, what needs to be done is similar to the hive connector.
> You need to implement a catalog of presto, which translates the Presto
> table into a Flink table. You may need to deal with partitions, statistics,
> and so on.
>
> Best,
> Jingsong Lee
>
> On Mon, Jan 27, 2020 at 9:58 PM Itamar Syn-Hershko <
> ita...@bigdataboutique.com> wrote:
>
>> Yes, Flink does batch processing by "reevaluating a stream" so to speak.
>> Presto doesn't have sources and sinks, only catalogs (which are always
>> allowing reads, and sometimes also writes).
>>
>> Presto catalogs are a configuration - they are managed on the node
>> filesystem as a configuration file and nowhere else. Flink sources/sinks
>> are programmatically configurable and are compiled into your Flink program.
>> So that is not possible at the moment, and all that's possible to do is get
>> that info form the API of both products and visualize that. Definitely not
>> managing them from a single place.
>>
>> On Mon, Jan 27, 2020 at 3:54 PM Flavio Pompermaier 
>> wrote:
>>
>>> Both Presto and Flink make use of a Catalog in order to be able to
>>> read/write data from a source/sink.
>>> I don't agree about " Flink is about processing data streams" because
>>> Flink is competitive also for the batch workloads (and this will be further
>>> improved in the next releases).
>>> I'd like to register my data sources/sinks in one single catalog (E.g.
>>> Presto) and then being able to reuse it also in Flink (with a simple
>>> translation).
>>> My idea of integration here is thus more at catalog level since I would
>>> use Presto for exploring data from UI and Flink to process it because once
>>> the configuration part has finished (since I have many Flink jobs that I
>>> don't want to throw away or rewrite).
>>>
>>> On Mon, Jan 27, 2020 at 2:30 PM Itamar Syn-Hershko <
>>> ita...@bigdataboutique.com> wrote:
>>>
 Hi Flavio,

 Presto contributor and Starburst Partners here.

 Presto and Flink are solving completely different challenges. Flink is
 about processing data streams as they come in; Presto is about ad-hoc /
 periodic querying of data sources.

 A typical architecture would use Flink to process data streams and
 write data and aggregations to some data stores (Redis, MemSQL, SQLs,
 Elasticsearch, etc) and then using Presto to query those data stores (and
 possible also others using Query Federation).

 What kind of integration will you be looking for?

 On Mon, Jan 27, 2020 at 1:44 PM Flavio Pompermaier <
 pomperma...@okkam.it> wrote:

> Hi all,
> is there any integration between Presto and Flink? I'd like to use
> Presto for the UI part (preview and so on) while using Flink for the batch
> processing. Do you suggest something else otherwise?
>
> Best,
> Flavio
>


 --

 [image: logo] 
 Itamar Syn-Hershko
 CTO, Founder
 +972-54-2467860
 ita...@bigdataboutique.com
 https://bigdataboutique.com
 
 
 

>>>
>>>
>>
>> --
>>
>> [image: logo] 
>> Itamar Syn-Hershko
>> CTO, Founder
>> +972-54-2467860
>> ita...@bigdataboutique.com
>> https://bigdataboutique.com
>> 
>> 
>> 
>>
>
>
> --
> Best, Jingsong Lee
>
>
>


Re: where does flink store the intermediate results of a join and what is the key?

2020-01-28 Thread kant kodali
Hi Arvid,

I am trying to understand your statement. I am new to Flink so excuse me if
I don't know something I should have known. ProcessFunction just process
the records right? If so, how is it better than writing to an external
system? At the end of the day I want to be able to query it (doesn't have
to be through Queryable state and actually I probably don't want to use
Queryable state for its limitations). But ideally I want to be able to
query the intermediate states using SQL and hopefully, the store that is
maintaining the intermediate state has some sort of index support so the
read queries are faster than doing the full scan.

Also, I hear Querying intermediate state just like one would in a database
is a widely requested feature so its a bit surprising that this is not
solved just yet but I am hopeful!

Thanks!



On Tue, Jan 28, 2020 at 12:45 AM Arvid Heise  wrote:

> Hi Kant,
>
> just wanted to mention the obvious. If you add a ProcessFunction right
> after the join, you could maintain a user state with the same result. That
> will of course blow up the data volume by a factor of 2, but may still be
> better than writing to an external system.
>
> On Mon, Jan 27, 2020 at 6:09 PM Benoît Paris <
> benoit.pa...@centraliens-lille.org> wrote:
>
>> Dang what a massive PR: Files changed2,118,  +104,104 −29,161 lines
>> changed.
>> Thanks for the details, Jark!
>>
>> On Mon, Jan 27, 2020 at 4:07 PM Jark Wu  wrote:
>>
>>> Hi Kant,
>>> Having a custom state backend is very difficult and is not recommended.
>>>
>>> Hi Benoît,
>>> Yes, the "Query on the intermediate state is on the roadmap" I
>>> mentioned is referring to integrate Table API & SQL with Queryable State.
>>> We also have an early issue FLINK-6968 to tracks this.
>>>
>>> Best,
>>> Jark
>>>
>>>
>>> On Fri, 24 Jan 2020 at 00:26, Benoît Paris <
>>> benoit.pa...@centraliens-lille.org> wrote:
>>>
 Hi all!

 @Jark, out of curiosity, would you be so kind as to expand a bit on "Query
 on the intermediate state is on the roadmap"?
 Are you referring to working on
 QueryableStateStream/QueryableStateClient [1], or around "FOR SYSTEM_TIME
 AS OF" [2], or on other APIs/concepts (is there a FLIP?)?

 Cheers
 Ben

 [1]
 https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#querying-state
 [2]
 https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/temporal_tables.html#temporal-table


 On Thu, Jan 23, 2020 at 6:40 AM kant kodali  wrote:

> Is it a common practice to have a custom state backend? if so, what
> would be a popular custom backend?
>
> Can I do Elasticseatch as a state backend?
>
> Thanks!
>
> On Wed, Jan 22, 2020 at 1:42 AM Jark Wu  wrote:
>
>> Hi Kant,
>>
>> 1) List of row is also sufficient in this case. Using a MapState is
>> in order to retract a row faster, and save the storage size.
>>
>> 2) State Process API is usually used to process save point. I’m
>> afraid the performance is not good to use it for querying.
>> On the other side, AFAIK, State Process API requires the uid of
>> operator. However, uid of operators is not set in Table API & SQL.
>> So I’m not sure whether it works or not.
>>
>> 3)You can have a custom statebackend by
>> implement org.apache.flink.runtime.state.StateBackend interface, and use 
>> it
>> via `env.setStateBackend(…)`.
>>
>> Best,
>> Jark
>>
>> On Wed, 22 Jan 2020 at 14:16, kant kodali  wrote:
>>
>>> Hi Jark,
>>>
>>> 1) shouldn't it be a col1 to List of row? multiple rows can have the
>>> same joining key right?
>>>
>>> 2) Can I use state processor API
>>> 
>>> from an external application to query the intermediate results in near
>>> real-time? I thought querying rocksdb state is a widely requested 
>>> feature.
>>> It would be really great to consider this feature for 1.11
>>>
>>> 3) Is there any interface where I can implement my own state backend?
>>>
>>> Thanks!
>>>
>>>
>>> On Tue, Jan 21, 2020 at 6:08 PM Jark Wu  wrote:
>>>
 Hi Kant,

 1) Yes, it will be stored in rocksdb statebackend.
 2) In old planner, the left state is the same with right state
 which are both `>>`.
 It is a 2-level map structure, where the `col1` is the join
 key, it is the first-level key of the state. The key of the MapState 
 is the
 input row,
 and the `count` is the number of this row, the expiredTime
 indicates when to cleanup this row (avoid infinite state size). You can
 find the source code here[1].
 In blink planner, the state structure will be more complex
 which is determined by the meta-inf

Re: REST rescale with Flink on YARN

2020-01-28 Thread Gary Yao
Hi,

You can use

yarn application -status 

to find the host and port that the server is listening on (AM host & RPC
Port). If you need to access that information programmatically, take a look
at
the YarnClient [1].

Best,
Gary


[1]
https://hadoop.apache.org/docs/r2.8.5/api/org/apache/hadoop/yarn/client/api/YarnClient.html

On Thu, Jan 23, 2020 at 3:21 PM Vasily Melnik <
vasily.mel...@glowbyteconsulting.com> wrote:

> Hi all,
> I've found some solution for this issue.
> Problem is that with YARN ApplicationMaster URL we communicate with
> JobManager via proxy which is implemented on Jetty 6 (for Hadoop 2.6).
> So to use PATCH method we need to locate original JobManager URL.
> Using /jobmanager/config API we could get only host, but web.port is
> displayed as 0 (???)
> To find actual web port, we should parse YARN logs for jobmanager, where
> we can find something like this:
>
> *INFO org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint - Rest
> endpoint listening at :.*
>
> Maybe  someone knows less complicated way to find actual REST URL under
> YARN?
>
>
>
>
> С уважением,
> Василий Мельник
>
>
> On Thu, 23 Jan 2020 at 15:32, Chesnay Schepler  wrote:
>
>> Older versions of Jetty don't support PATCH requests. You will either
>> have to update it or create a custom Flink version that uses POST for the
>> rescale operation.
>>
>> On 23/01/2020 13:23, Vasily Melnik wrote:
>>
>> Hi all.
>> I'm using Flink 1.8 on YARN with CDH 5.12
>> When i try to perform rescale request:
>>
>> curl -v -X PATCH 
>> '/proxy/application_1576854986116_0079/jobs/11dcfc3163936fc019e049fc841b075b/rescaling?parallelism=3
>>  
>> '
>>
>> i get a mistake:
>>
>> *Method PATCH is not defined in RFC 2068 and is not supported by the
>> Servlet API *GET and POST methods work well.
>> The Server type in response is Jetty(6.1.26.cloudera.4).
>>
>> How can i deal with this situation?
>>
>> С уважением,
>> Василий Мельник
>>
>>
>>


Re: FileStreamingSink is using the same counter for different files

2020-01-28 Thread Kostas Kloudas
Hi Pawel,

You are correct that the write method invocation is guaranteed to be
thread safe for the same sub operator instance.
But I am not sure if having a unique counter per subtask across
buckets would add much to the user experience of the sink.
I think that in both cases, the interpretation of the part files would
be the same.

I may be wrong though so please let me know if this is a deal breaker for you.

Cheers,
Kostas


On Sat, Jan 25, 2020 at 11:48 AM Pawel Bartoszek
 wrote:
>
> Hi Kostas,
>
> Thanks for confirming that. I started thinking it might be useful or more 
> user friendly to use unique counter across buckets for the same operator 
> subtask?
> The way I could imagine this working is to pass max counter to the 
> https://github.com/apache/flink/blob/e7e24471240dbaa6b5148d406575e57d170b1623/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L204
>  write method? or bucket holding instance of Buckets class and accessing 
> global counter from there? As far as I know the write method invocation is 
> guaranteed to be thread safe for the same sub operator instance.
>
> Thanks,
> Pawel
>
>
> On Fri, 24 Jan 2020 at 20:45, Kostas Kloudas  wrote:
>>
>> Hi Pawel,
>>
>> You are correct that counters are unique within the same bucket but
>> NOT across buckets. Across buckets, you may see the same counter being
>> used.
>> The max counter is used only upon restoring from a failure, resuming
>> from a savepoint or rescaling and this is done to guarantee that n
>> valid data are overwritten while limiting the state that Flink has to
>> keep internally. For a more detailed discussion about the why, you can
>> have a look here: https://issues.apache.org/jira/browse/FLINK-13609
>>
>> Cheers,
>> Kostas
>>
>> On Fri, Jan 24, 2020 at 5:16 PM Pawel Bartoszek
>>  wrote:
>> >
>> > I have looked into the source code and it looks likes that the same 
>> > counter counter value being used in two buckets is correct.
>> > Each Bucket class 
>> > https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
>> >  is passed partCounter in the constructor. Whenever part file is rolled 
>> > over then counter is incremented within the scope of this bucket. It can 
>> > happen that there are two or more active buckets and counter is increased 
>> > independently inside them so that they are become equal. However, global 
>> > max counter maintained by Buckets class always keeps the max part counter 
>> > so that when new bucket is created is passed the correct part counter.
>> >
>> > I have done my analysis based on the logs from my job. I highlighted the 
>> > same counter value used for part-0-8.
>> >
>> > 2020-01-24 14:57:41 [Sink (1/1)-thread-0] DEBUG 
>> > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 
>> > 0 opening new part file "part-0-6" for bucket id=2020-01-24T14_54_00Z.
>> > 2020-01-24 14:57:41 [Async Sink: Unnamed (1/1)] INFO  
>> > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - 
>> > Subtask 0 received completion notification for checkpoint with id=7.
>> > 2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] INFO  
>> > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - 
>> > Subtask 0 checkpointing for checkpoint with id=8 (max part counter=7).
>> > 2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] DEBUG 
>> > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 
>> > 0 closing in-progress part file for bucket id=2020-01-24T14_54_00Z on 
>> > checkpoint.
>> > 2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] DEBUG 
>> > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - 
>> > Subtask 0 checkpointing: BucketState for bucketId=2020-01-24T14_54_00Z and 
>> > bucketPath=s3://xxx
>> > 2020-01-24 14:58:11 [Sink (1/1)-thread-0] DEBUG 
>> > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 
>> > 0 closing in-progress part file for bucket id=2020-01-24T14_54_00Z due to 
>> > element
>> > 2020-01-24 14:58:11 [Sink (1/1)-thread-0] DEBUG 
>> > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 
>> > 0 opening new part file "part-0-7" for bucket id=2020-01-24T14_54_00Z.
>> > 2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] INFO  
>> > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - 
>> > Subtask 0 received completion notification for checkpoint with id=8.
>> > 2020-01-24 14:58:23 [Sink (1/1)-thread-0] DEBUG 
>> > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 
>> > 0 closing in-progress part file for bucket id=2020-01-24T14_55_00Z due to 
>> > element
>> > 2020-01-24 14:58:23 [Sink (1/1)-thread-0] DEBUG 
>> > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 
>> > 0 opening new part file "part-0-8" for bucket id=2020-01-24T14_55_00Z.
>> > 2020-01-24 14:58:41 [Async S

Re: Flink and Presto integration

2020-01-28 Thread Piotr Nowojski
Hi,

Yes, Presto (in presto-hive connector) is just using hive Metastore to get the 
table definitions/meta data. If you connect to the same hive Metastore with 
Flink, both systems should be able to see the same tables.

Piotrek

> On 28 Jan 2020, at 04:34, Jingsong Li  wrote:
> 
> Hi Flavio,
> 
> Your requirement should be to use blink batch to read the tables in Presto?
> I'm not familiar with Presto's catalog. Is it like hive Metastore?
> 
> If so, what needs to be done is similar to the hive connector.
> You need to implement a catalog of presto, which translates the Presto table 
> into a Flink table. You may need to deal with partitions, statistics, and so 
> on.
> 
> Best,
> Jingsong Lee
> 
> On Mon, Jan 27, 2020 at 9:58 PM Itamar Syn-Hershko 
> mailto:ita...@bigdataboutique.com>> wrote:
> Yes, Flink does batch processing by "reevaluating a stream" so to speak. 
> Presto doesn't have sources and sinks, only catalogs (which are always 
> allowing reads, and sometimes also writes).
> 
> Presto catalogs are a configuration - they are managed on the node filesystem 
> as a configuration file and nowhere else. Flink sources/sinks are 
> programmatically configurable and are compiled into your Flink program. So 
> that is not possible at the moment, and all that's possible to do is get that 
> info form the API of both products and visualize that. Definitely not 
> managing them from a single place.
> 
> On Mon, Jan 27, 2020 at 3:54 PM Flavio Pompermaier  > wrote:
> Both Presto and Flink make use of a Catalog in order to be able to read/write 
> data from a source/sink.
> I don't agree about " Flink is about processing data streams" because Flink 
> is competitive also for the batch workloads (and this will be further 
> improved in the next releases).
> I'd like to register my data sources/sinks in one single catalog (E.g. 
> Presto) and then being able to reuse it also in Flink (with a simple 
> translation).
> My idea of integration here is thus more at catalog level since I would use 
> Presto for exploring data from UI and Flink to process it because once the 
> configuration part has finished (since I have many Flink jobs that I don't 
> want to throw away or rewrite).
> 
> On Mon, Jan 27, 2020 at 2:30 PM Itamar Syn-Hershko 
> mailto:ita...@bigdataboutique.com>> wrote:
> Hi Flavio,
> 
> Presto contributor and Starburst Partners here.
> 
> Presto and Flink are solving completely different challenges. Flink is about 
> processing data streams as they come in; Presto is about ad-hoc / periodic 
> querying of data sources.
> 
> A typical architecture would use Flink to process data streams and write data 
> and aggregations to some data stores (Redis, MemSQL, SQLs, Elasticsearch, 
> etc) and then using Presto to query those data stores (and possible also 
> others using Query Federation).
> 
> What kind of integration will you be looking for?
> 
> On Mon, Jan 27, 2020 at 1:44 PM Flavio Pompermaier  > wrote:
> Hi all,
> is there any integration between Presto and Flink? I'd like to use Presto for 
> the UI part (preview and so on) while using Flink for the batch processing. 
> Do you suggest something else otherwise?
> 
> Best,
> Flavio
> 
> 
> -- 
> 
>     
> Itamar Syn-Hershko
> CTO, Founder
> +972-54-2467860 
> ita...@bigdataboutique.com 
> https://bigdataboutique.com 
>     
>    
>   
> 
> 
> 
> -- 
> 
>     
> Itamar Syn-Hershko
> CTO, Founder
> +972-54-2467860 
> ita...@bigdataboutique.com 
> https://bigdataboutique.com 
>     
>    
>   
> 
> 
> -- 
> Best, Jingsong Lee



Re: Blocking KeyedCoProcessFunction.processElement1

2020-01-28 Thread Taher Koitawala
Would AsyncIO operator not be an option for you to connect to RDBMS?

On Tue, Jan 28, 2020, 12:45 PM Alexey Trenikhun  wrote:

> Thank you Yun Tang.
> My implementation potentially could block for significant amount of time,
> because I wanted to do RDBMS maintenance (create partitions for new data,
> purge old data etc) in-line with writing stream data to a database
>
> --
> *From:* Yun Tang 
> *Sent:* Sunday, January 26, 2020 8:42:37 AM
> *To:* Alexey Trenikhun ; user@flink.apache.org <
> user@flink.apache.org>
> *Subject:* Re: Blocking KeyedCoProcessFunction.processElement1
>
> Hi Alexey
>
> Actually, I don't understand why you thing
> KeyedCoProcessFunction#processElement1 would block for significant amount
> of time, it just process record from the elements in the first input stream
> which is necessary. If you really find it would block for a long time, I
> think that's because your processing logic has some problem to stuck. On
> the other hand, since processing checkpoint and records hold the same lock,
> we cannot process checkpoint when the record processing logic did not
> release the lock.
>
> Best
> Yun Tang
> --
> *From:* Alexey Trenikhun 
> *Sent:* Thursday, January 23, 2020 13:04
> *To:* user@flink.apache.org 
> *Subject:* Blocking KeyedCoProcessFunction.processElement1
>
>
> Hello,
> If KeyedCoProcessFunction.processElement1 blocks for significant amount of
> time, will it prevent checkpoint ?
>
> Thanks,
> Alexey
>


Re: Flink RocksDB logs filling up disk space

2020-01-28 Thread Ahmad Hassan
Hi Yun,

Thank you for pointing that out. In our production landscapes with live
customers, we have 10 second checkpoint interval and 7MB of average
checkpoint size. We do incremental checkpoints. If we keep the checkpoint
interval longer (i.e. 1 minute) then the kafka consumer lag starts
increasing. The reason is that over the period of 1 minute, the checkpoint
size grows and the job takes long time to do the checkpoint and as a result
kafka consumer lag for our live traffic goes high. In order to keep
checkpoint size small, we tried 10 second option which is working out well
and our kafka lag never exceeds beyond 20 messages on average. But i agree
with you that 10 second option does not feel right and is too frequent in
my opinion.

Do you have any recommendations for checkpointing interval please ?

Best Regards,


On Tue, 28 Jan 2020 at 07:46, Yun Tang  wrote:

> Hi Ahmad
>
> Apart from setting the logger level of RocksDB, I also wonder why you
> would meet rocksdb checkpoint IO logs were filling up disk space very very
> quickly. How larger the local checkpoint state is and how long the
> checkpoint interval is? I think you might give a too short interval of
> checkpoint, even you could avoid to record too many logs, and I don't think
> current checkpoint configuration is appropriate.
>
> Best
> Yun Tang
> --
> *From:* Ahmad Hassan 
> *Sent:* Monday, January 27, 2020 20:22
> *To:* user 
> *Subject:* Re: Flink RocksDB logs filling up disk space
>
>
> Thanks Chesnay!
>
> On Mon, 27 Jan 2020 at 11:29, Chesnay Schepler  wrote:
>
> Please see https://issues.apache.org/jira/browse/FLINK-15068
>
> On 27/01/2020 12:22, Ahmad Hassan wrote:
>
> Hello,
>
> In our production systems, we see that flink rocksdb checkpoint IO logs
> are filling up disk space very very quickly in the order of GB's as the
> logging is very verbose. How do we disable or suppress these logs please ?
> The rocksdb file checkpoint.cc is dumping huge amount of checkpoint logs
> like
>
> Log(db_options.info_log, "Hard Linking %s", src_fname.c_str());
>
>
> Best Regards,
>
>
>


Re: SideOutput Exception: "Output Tag must not be null"

2020-01-28 Thread Arvid Heise
Hi Izual,

it seems as the code example is not complete. I'm assuming backupOutputTag
is actually a field within your application class.

If you look at the examples, you will notice that backupOutputTag should be
defined within the method that defines your topology and not on the
wrapping object.
So drop the private modifier and move the definition inside the function.


Re: Blocking KeyedCoProcessFunction.processElement1

2020-01-28 Thread Arvid Heise
Hi Alexey,

we cannot perform a checkpoint on a UDF that is still being called as we
would not be able to have a consistent snapshot. You could potentially have
changed the state, so if we replay the event during recovery, you may get
inexact results. For example consider a simple counter, where you just
count all data coming in from input1. If we would checkpoint during the UDF
invocation, you could have already incremented the counter, such that upon
recovery, you would count that input record twice.

This conceptual inability to checkpoint during UDF is completely
independent of the implementation constraints that Yun outlined; much of
that has changed in 1.10 and will further improve in 1.11.

So coming back to your question, it's generally a bad idea to do heavy
computation within one UDF call. If that maintenance work needs to be done
prior to any record processing, `open` sounds more plausible. I'd even
consider doing that work in your `main` if it doesn't need to be redone
during recovery.

If you need access to the records (to create new partitions), I'd go with a
prepended asyncIO. It's specifically built around interactions with
external systems and supports intermediate checkpoints (at the cost that
users may not use user state).

-- Arvid

On Tue, Jan 28, 2020 at 8:50 AM Yun Tang  wrote:

> Hi Alexey
>
> If possible, I think you could move some RDBMS maintenance operations to
> the #open method of RichFunction to reduce the possibility of blocking
> processing records.
>
> Best
> Yun Tang
> --
> *From:* Alexey Trenikhun 
> *Sent:* Tuesday, January 28, 2020 15:15
> *To:* Yun Tang ; user@flink.apache.org <
> user@flink.apache.org>
> *Subject:* Re: Blocking KeyedCoProcessFunction.processElement1
>
> Thank you Yun Tang.
> My implementation potentially could block for significant amount of time,
> because I wanted to do RDBMS maintenance (create partitions for new data,
> purge old data etc) in-line with writing stream data to a database
>
> --
> *From:* Yun Tang 
> *Sent:* Sunday, January 26, 2020 8:42:37 AM
> *To:* Alexey Trenikhun ; user@flink.apache.org <
> user@flink.apache.org>
> *Subject:* Re: Blocking KeyedCoProcessFunction.processElement1
>
> Hi Alexey
>
> Actually, I don't understand why you thing
> KeyedCoProcessFunction#processElement1 would block for significant amount
> of time, it just process record from the elements in the first input stream
> which is necessary. If you really find it would block for a long time, I
> think that's because your processing logic has some problem to stuck. On
> the other hand, since processing checkpoint and records hold the same lock,
> we cannot process checkpoint when the record processing logic did not
> release the lock.
>
> Best
> Yun Tang
> --
> *From:* Alexey Trenikhun 
> *Sent:* Thursday, January 23, 2020 13:04
> *To:* user@flink.apache.org 
> *Subject:* Blocking KeyedCoProcessFunction.processElement1
>
>
> Hello,
> If KeyedCoProcessFunction.processElement1 blocks for significant amount of
> time, will it prevent checkpoint ?
>
> Thanks,
> Alexey
>


Re: Is there anything strictly special about sink functions?

2020-01-28 Thread Arvid Heise
As Konstantin said, you need to use a sink, but you could use
`org.apache.flink.streaming.api.functions.sink.DiscardingSink`.

There is nothing inherently wrong with outputting things through a UDF. You
need to solve the same challenges as in a SinkFunction: you need to
implement your own state management. Also make sure that you can handle
duplicates occurring during recovery after a restart.

On Tue, Jan 28, 2020 at 6:43 AM Konstantin Knauf 
wrote:

> Hi Andrew,
>
> as far as I know there is nothing particularly special about the sink in
> terms of how it handles state or time. You can not leave the pipeline
> "unfinished", only sinks trigger the execution of the whole pipeline.
>
> Cheers,
>
> Konstantin
>
>
>
> On Fri, Jan 24, 2020 at 5:59 PM Andrew Roberts  wrote:
>
>> Hello,
>>
>> I’m trying to push some behavior that we’ve currently got in a large,
>> stateful SinkFunction implementation into Flink’s windowing system. The
>> task at hand is similar to what StreamingFileSink provides, but more
>> flexible. I don’t want to re-implement that sink, because it uses the
>> StreamingRuntimeContext.getProcessingTimeService() via a cast - that class
>> is marked as internal, and I’d like to avoid the exposure to an interface
>> that could change. Extending it similarly introduces complexity I would
>> rather not add to our codebase.
>>
>> WindowedStream.process() provides more or less the pieces I need, but the
>> stream continues on after a ProcessFunction - there’s no way to process()
>> directly into a sink. I could use a ProcessFunction[In, Unit, Key, Window],
>> and follow that immediately with a no-op sink that discards the Unit
>> values, or I could just leave the stream “unfinished," with no sink.
>>
>> Is there a downside to either of these approaches? Is there anything
>> special about doing sink-like work in a ProcessFunction or FlatMapFunction
>> instead of a SinkFunction?
>>
>> Thanks,
>>
>> Andrew
>>
>>
>>
>> --
>> *Confidentiality Notice: The information contained in this e-mail and any
>>
>> attachments may be confidential. If you are not an intended recipient, you
>>
>> are hereby notified that any dissemination, distribution or copying of
>> this
>>
>> e-mail is strictly prohibited. If you have received this e-mail in error,
>>
>> please notify the sender and permanently delete the e-mail and any
>>
>> attachments immediately. You should not retain, copy or use this e-mail or
>>
>> any attachment for any purpose, nor disclose all or any part of the
>>
>> contents to any other person. Thank you.*
>>
>
>
> --
>
> Konstantin Knauf | Solutions Architect
>
> +49 160 91394525
>
>
> Follow us @VervericaData Ververica 
>
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Tony) Cheng
>


Re: batch job OOM

2020-01-28 Thread Arvid Heise
Hi Fanbin,

you could use the RC1 of Flink that was created yesterday and use the
apache repo
https://repository.apache.org/content/repositories/orgapacheflink-1325/org/apache/flink/flink-json/1.10.0/
.
Alternatively, if you build Flink locally with `mvn install`, then you
could use mavenLocal() in your gradle.build and feed from that.

Best,

Arvid

On Tue, Jan 28, 2020 at 1:24 AM Fanbin Bu  wrote:

> I can build flink 1.10 and install it on to EMR
> (flink-dist_2.11-1.10.0.jar). but what about other dependencies in my
> project build.gradle, ie. flink-scala_2.11, flink-json, flink-jdbc... do I
> continue to use 1.9.0 since there is no 1.10 available?
>
> Thanks,
> Fanbin
>
> On Fri, Jan 24, 2020 at 11:39 PM Bowen Li  wrote:
>
>> Hi Fanbin,
>>
>> You can install your own Flink build in AWS EMR, and it frees you from
>> Emr’s release cycles
>>
>> On Thu, Jan 23, 2020 at 03:36 Jingsong Li  wrote:
>>
>>> Fanbin,
>>>
>>> I have no idea now, can you created a JIRA to track it? You can describe
>>> complete SQL and some data informations.
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Thu, Jan 23, 2020 at 4:13 PM Fanbin Bu 
>>> wrote:
>>>
 Jingsong,

 Do you have any suggestions to debug the above mentioned
 IndexOutOfBoundsException error?
 Thanks,

 Fanbin

 On Wed, Jan 22, 2020 at 11:07 PM Fanbin Bu 
 wrote:

> I got the following error when running another job. any suggestions?
>
> Caused by: java.lang.IndexOutOfBoundsException
> at
> org.apache.flink.core.memory.MemorySegment.getInt(MemorySegment.java:701)
> at
> org.apache.flink.table.dataformat.BinaryRow.getInt(BinaryRow.java:264)
> at HashWinAggWithKeys$538.endInput(Unknown Source)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.endInput(OperatorChain.java:276)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.checkFinished(StreamOneInputProcessor.java:151)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:138)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748)
>
> On Wed, Jan 22, 2020 at 8:57 PM Fanbin Bu 
> wrote:
>
>> Jingsong,
>>
>> I set the config value to be too large. After I changed it to a
>> smaller number it works now!
>> thanks you for the help. really appreciate it!
>>
>> Fanbin
>>
>> On Wed, Jan 22, 2020 at 8:50 PM Jingsong Li 
>> wrote:
>>
>>> Fanbin,
>>>
>>> Looks like your config is wrong, can you show your config code?
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Thu, Jan 23, 2020 at 12:41 PM Fanbin Bu 
>>> wrote:
>>>
 Jingsong,

 Great, now i got a different error:

 java.lang.NullPointerException: Initial Segment may not be null
at 
 org.apache.flink.runtime.memory.AbstractPagedOutputView.(AbstractPagedOutputView.java:65)
at 
 org.apache.flink.runtime.io.disk.SimpleCollectingOutputView.(SimpleCollectingOutputView.java:49)
at 
 org.apache.flink.table.runtime.operators.aggregate.BytesHashMap$RecordArea.(BytesHashMap.java:522)
at 
 org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.(BytesHashMap.java:190)
at 
 org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.(BytesHashMap.java:149)
at LocalHashWinAggWithKeys$292.open(Unknown Source)
at 
 org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
at 
 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
at 
 org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)


 is there any other config i should add?

 thanks,

 Fanbin


 On Wed, Jan 22, 2020 at 8:07 PM Fanbin Bu 
 wrote:

> you beat me to it.
> let's me try that.
>
> On Wed, Jan 22, 2020 at 7:57 PM Jingsong Li <
> jingsongl...@gmail.com> wrote:
>
>> Fanbin,
>>
>> Document is here:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/config.html
>> NOTE: yo

Re: PostgreSQL JDBC connection drops after inserting some records

2020-01-28 Thread Fabian Hueske
Hi,

The exception is thrown by Postgres.
I'd start investigating there what the problem is.

Maybe you need to tweak your Postgres configuration, but it might also be
that the Flink connector needs to be differently configured.
If the necessary config option is missing, it would be good to add.

However, at this point it's not clear why Postgres fails.
I'd recommend to check the Postgres exception and figure out why it is
failing.

Cheers,
Fabian


Am Di., 28. Jan. 2020 um 09:02 Uhr schrieb Arvid Heise :

> Hi Soheil,
>
> what is your actual question? Did the application eventually finish or
> does it keep restarting?
>
> In general, communication with external systems may fail from time to
> time. Only if it persists, we would explore it. If it is very rare, a
> restart should already help.
>
> Best,
>
> Arvid
>
> On Thu, Jan 23, 2020 at 5:35 PM Soheil Pourbafrani 
> wrote:
>
>> Hi,
>> I have a peace of Flink Streaming code that reads data from files and
>> inserts them into the PostgreSQL table. After inserting 6 to 11 million
>> records, I got the following errors:
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *Caused by: java.lang.RuntimeException: Execution of JDBC statement
>> failed. at
>> org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.flush(JDBCOutputFormat.java:219)
>> at
>> org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.writeRecord(JDBCOutputFormat.java:210)
>> at
>> org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.writeRecord(JDBCOutputFormat.java:41)
>> at
>> org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.invoke(OutputFormatSinkFunction.java:86)
>> at
>> org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
>> at
>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
>> ... 15 moreCaused by: java.sql.BatchUpdateException: Batch entry 0 INSERT
>> INTO csv_data(asset, tag, t, q, v, backfill, createdAt, createdBy) VALUES
>> ('SST', 'XC_XC', '2015-04-11 21:36:23+03', 12.0, '1.00', 'FALSE',
>> '2020-01-23 19:22:14.469+03', 'system') ON CONFLICT DO NOTHING was aborted:
>> An I/O error occurred while sending to the backend.  Call getNextException
>> to see other errors in the batch. at
>> org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:148)
>> at
>> org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:515)
>> at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:853) at
>> org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1546)
>> at
>> org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.flush(JDBCOutputFormat.java:216)
>> ... 21 moreCaused by: org.postgresql.util.PSQLException: An I/O error
>> occurred while sending to the backend. at
>> org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:516)
>> ... 24 moreCaused by: java.io.EOFException at
>> org.postgresql.core.PGStream.receiveChar(PGStream.java:337) at
>> org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2000)*
>> at
>> org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:510)
>> ... 24 more
>>
>> However as I enabled the Restart Strategy, the app will automatically be
>> restarted and reconnect to the database.
>> My code simply reads data from files and after transforming them into the
>> table schema, insert the rows into the table.
>>
>> It would be great if anyone can help me with this
>> Thanks
>>
>


Re: where does flink store the intermediate results of a join and what is the key?

2020-01-28 Thread Arvid Heise
Hi Kant,

just wanted to mention the obvious. If you add a ProcessFunction right
after the join, you could maintain a user state with the same result. That
will of course blow up the data volume by a factor of 2, but may still be
better than writing to an external system.

On Mon, Jan 27, 2020 at 6:09 PM Benoît Paris <
benoit.pa...@centraliens-lille.org> wrote:

> Dang what a massive PR: Files changed2,118,  +104,104 −29,161 lines
> changed.
> Thanks for the details, Jark!
>
> On Mon, Jan 27, 2020 at 4:07 PM Jark Wu  wrote:
>
>> Hi Kant,
>> Having a custom state backend is very difficult and is not recommended.
>>
>> Hi Benoît,
>> Yes, the "Query on the intermediate state is on the roadmap" I
>> mentioned is referring to integrate Table API & SQL with Queryable State.
>> We also have an early issue FLINK-6968 to tracks this.
>>
>> Best,
>> Jark
>>
>>
>> On Fri, 24 Jan 2020 at 00:26, Benoît Paris <
>> benoit.pa...@centraliens-lille.org> wrote:
>>
>>> Hi all!
>>>
>>> @Jark, out of curiosity, would you be so kind as to expand a bit on "Query
>>> on the intermediate state is on the roadmap"?
>>> Are you referring to working on
>>> QueryableStateStream/QueryableStateClient [1], or around "FOR SYSTEM_TIME
>>> AS OF" [2], or on other APIs/concepts (is there a FLIP?)?
>>>
>>> Cheers
>>> Ben
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#querying-state
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/temporal_tables.html#temporal-table
>>>
>>>
>>> On Thu, Jan 23, 2020 at 6:40 AM kant kodali  wrote:
>>>
 Is it a common practice to have a custom state backend? if so, what
 would be a popular custom backend?

 Can I do Elasticseatch as a state backend?

 Thanks!

 On Wed, Jan 22, 2020 at 1:42 AM Jark Wu  wrote:

> Hi Kant,
>
> 1) List of row is also sufficient in this case. Using a MapState is in
> order to retract a row faster, and save the storage size.
>
> 2) State Process API is usually used to process save point. I’m afraid
> the performance is not good to use it for querying.
> On the other side, AFAIK, State Process API requires the uid of
> operator. However, uid of operators is not set in Table API & SQL.
> So I’m not sure whether it works or not.
>
> 3)You can have a custom statebackend by
> implement org.apache.flink.runtime.state.StateBackend interface, and use 
> it
> via `env.setStateBackend(…)`.
>
> Best,
> Jark
>
> On Wed, 22 Jan 2020 at 14:16, kant kodali  wrote:
>
>> Hi Jark,
>>
>> 1) shouldn't it be a col1 to List of row? multiple rows can have the
>> same joining key right?
>>
>> 2) Can I use state processor API
>> 
>> from an external application to query the intermediate results in near
>> real-time? I thought querying rocksdb state is a widely requested 
>> feature.
>> It would be really great to consider this feature for 1.11
>>
>> 3) Is there any interface where I can implement my own state backend?
>>
>> Thanks!
>>
>>
>> On Tue, Jan 21, 2020 at 6:08 PM Jark Wu  wrote:
>>
>>> Hi Kant,
>>>
>>> 1) Yes, it will be stored in rocksdb statebackend.
>>> 2) In old planner, the left state is the same with right state which
>>> are both `>>`.
>>> It is a 2-level map structure, where the `col1` is the join key,
>>> it is the first-level key of the state. The key of the MapState is the
>>> input row,
>>> and the `count` is the number of this row, the expiredTime
>>> indicates when to cleanup this row (avoid infinite state size). You can
>>> find the source code here[1].
>>> In blink planner, the state structure will be more complex which
>>> is determined by the meta-information of upstream. You can see the 
>>> source
>>> code of blink planner here [2].
>>> 3) Currently, the intermediate state is not exposed to users.
>>> Usually, users should write the query result to an external system (like
>>> Mysql) and query the external system.
>>> Query on the intermediate state is on the roadmap, but I guess
>>> it is not in 1.11 plan.
>>>
>>> Best,
>>> Jark
>>>
>>> [1]:
>>> http://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala#L61
>>> [2]:
>>> https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/JoinRecordStateViews.java#L45
>>>
>>>
>>> 2020年1月21日 18:01,kant kodali  写道:
>>>
>>> Hi All,
>>>
>>> If I run a query like this
>>>
>>> StreamTableEnvironment.sqlQuery("select * from table1 join ta

Re: GC overhead limit exceeded, memory full of DeleteOnExit hooks for S3a files

2020-01-28 Thread Arvid Heise
Hi Mark,

if you add `fs.s3a.fast.upload.buffer: true` to your Flink configuration,
it should add that to the respective Hadoop configuration when creating the
file system.
Note, I haven't tried it but all keys with the prefixes "s3.", "s3a.",
"fs.s3a." should be forwarded.

-- Arvid

On Mon, Jan 27, 2020 at 5:16 PM Piotr Nowojski  wrote:

> Hi,
>
> I think reducing the frequency of the checkpoints and decreasing
> parallelism of the things using the S3AOutputStream class, would help to
> mitigate the issue.
>
> I don’t know about other solutions. I would suggest to ask this question
> directly to Steve L. in the bug ticket [1], as he is the one that fixed the
> issue. If there is no workaround, maybe it would be possible to put a
> pressure on the Hadoop guys to back port the fix to older versions?
>
> Piotrek
>
> [1] https://issues.apache.org/jira/browse/HADOOP-15658
>
> On 27 Jan 2020, at 15:41, Cliff Resnick  wrote:
>
> I know from experience that Flink's shaded S3A FileSystem does not
> reference core-site.xml, though I don't remember offhand what file (s) it
> does reference. However since it's shaded, maybe this could be fixed by
> building a Flink FS referencing 3.3.0? Last I checked I think it referenced
> 3.1.0.
>
> On Mon, Jan 27, 2020, 8:48 AM David Magalhães 
> wrote:
>
>> Does StreamingFileSink use core-site.xml ? When I was using it, it didn't
>> load any configurations from core-site.xml.
>>
>> On Mon, Jan 27, 2020 at 12:08 PM Mark Harris 
>> wrote:
>>
>>> Hi Piotr,
>>>
>>> Thanks for the link to the issue.
>>>
>>> Do you know if there's a workaround? I've tried setting the following in
>>> my core-site.xml:
>>>
>>> ​fs.s3a.fast.upload.buffer=true
>>>
>>> To try and avoid writing the buffer files, but the taskmanager breaks
>>> with the same problem.
>>>
>>> Best regards,
>>>
>>> Mark
>>> --
>>> *From:* Piotr Nowojski  on behalf of Piotr
>>> Nowojski 
>>> *Sent:* 22 January 2020 13:29
>>> *To:* Till Rohrmann 
>>> *Cc:* Mark Harris ; flink-u...@apache.org <
>>> flink-u...@apache.org>; kkloudas 
>>> *Subject:* Re: GC overhead limit exceeded, memory full of DeleteOnExit
>>> hooks for S3a files
>>>
>>> Hi,
>>>
>>> This is probably a known issue of Hadoop [1]. Unfortunately it was only
>>> fixed in 3.3.0.
>>>
>>> Piotrek
>>>
>>> [1] https://issues.apache.org/jira/browse/HADOOP-15658
>>>
>>> On 22 Jan 2020, at 13:56, Till Rohrmann  wrote:
>>>
>>> Thanks for reporting this issue Mark. I'm pulling Klou into this
>>> conversation who knows more about the StreamingFileSink. @Klou does the
>>> StreamingFileSink relies on DeleteOnExitHooks to clean up files?
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Jan 21, 2020 at 3:38 PM Mark Harris 
>>> wrote:
>>>
>>> Hi,
>>>
>>> We're using flink 1.7.2 on an EMR cluster v emr-5.22.0, which runs
>>> hadoop v "Amazon 2.8.5". We've recently noticed that some TaskManagers fail
>>> (causing all the jobs running on them to fail) with an
>>> "java.lang.OutOfMemoryError: GC overhead limit exceeded”. The taskmanager
>>> (and jobs that should be running on it) remain down until manually
>>> restarted.
>>>
>>> I managed to take and analyze a memory dump from one of the afflicted
>>> taskmanagers.
>>>
>>> It showed that 85% of the heap was made up of
>>> the java.io.DeleteOnExitHook.files hashset. The majority of the strings in
>>> that hashset (9041060 out of ~9041100) pointed to files that began
>>> /tmp/hadoop-yarn/s3a/s3ablock
>>>
>>> The problem seems to affect jobs that make use of the StreamingFileSink
>>> - all of the taskmanager crashes have been on the taskmaster running at
>>> least one job using this sink, and a cluster running only a single
>>> taskmanager / job that uses the StreamingFileSink crashed with the GC
>>> overhead limit exceeded error.
>>>
>>> I've had a look for advice on handling this error more broadly without
>>> luck.
>>>
>>> Any suggestions or advice gratefully received.
>>>
>>> Best regards,
>>>
>>> Mark Harris
>>>
>>>
>>>
>>> The information contained in or attached to this email is intended only
>>> for the use of the individual or entity to which it is addressed. If you
>>> are not the intended recipient, or a person responsible for delivering it
>>> to the intended recipient, you are not authorised to and must not disclose,
>>> copy, distribute, or retain this message or any part of it. It may contain
>>> information which is confidential and/or covered by legal professional or
>>> other privilege under applicable law.
>>>
>>> The views expressed in this email are not necessarily the views of
>>> Centrica plc or its subsidiaries, and the company, its directors, officers
>>> or employees make no representation or accept any liability for its
>>> accuracy or completeness unless expressly stated to the contrary.
>>>
>>> Additional regulatory disclosures may be found here:
>>> https://www.centrica.com/privacy-cookies-and-legal-disclaimer#email
>>>
>>> PH Jones is a trading name of British Gas Social Housing Limited.
>>

Re: Location of flink-s3-fs-hadoop plugin (Flink 1.9.0 on EMR)

2020-01-28 Thread Arvid Heise
Sorry I meant to respond to Senthi.
Thank you Aaron for providing help.

Also one more thing that may be confusing the first time you use plugins.
You need to put plugins in their own folders, we improved documentation in
the upcoming 1.10 release:

flink-dist
├── conf
├── lib
...
└── plugins
└── s3 (name is arbitrary)
└── flink-s3-fs-hadoop.jar


On Tue, Jan 28, 2020 at 9:18 AM Arvid Heise  wrote:

> Hi Aaron,
>
> I encountered a similar issue when running on EMR. On the slaves, there
> are some lingering hadoop versions that are older than 2.7 (it was 2.6 if I
> remember correctly), which bleed into the classpath of Flink.
> Flink checks the Hadoop version to check if certain capabilities like file
> truncations are available or not. In your case, it loads the version info
> of the old jar first and determines that file truncation is not available.
>
> I'm currently working on a fix [1]. Then, Flink will only check the Hadoop
> version of the Hadoop that is bundled in the plugin and not the one first
> in the classpath.
>
> As a workaround, you could try the following options:
> - Don't use s3 as a plugin but put the jar in lib/ (that's not working
> anymore in Flink 1.10+ though)
> - Or connect to your slaves find the old hadoop-common*.jar and remove it
> manually. The location of all relevant hadoop-common*.jar should be visible
> in the task manager logs. If they have the version number >2.7 they are
> good, all other may result in issues.
>
> Ping me if you need further assistance.
>
> [1] https://issues.apache.org/jira/browse/FLINK-15777
>
> On Fri, Jan 24, 2020 at 5:30 PM Aaron Langford 
> wrote:
>
>> This seems to confirm that the S3 file system implementation is not being
>> loaded when you start your job.
>>
>> Can you share the details of how you are getting the flink-s3-fs-hadoop 
>> artifact
>> onto your cluster? Are you simply ssh-ing to the master node and doing this
>> manually? Are you doing this via a bootstrap action? Timing of this
>> action would be relevant as well.
>>
>> Aaron
>>
>> On Fri, Jan 24, 2020 at 8:12 AM Senthil Kumar 
>> wrote:
>>
>>> Thanks, here’s the debug output. It looks like we need to setup
>>> hdfs-config file in the flink config.
>>>
>>> Could you advise us further?
>>>
>>>
>>>
>>> --
>>>
>>>
>>>
>>> 2020-01-23 22:07:44,014 DEBUG org.apache.flink.core.fs.FileSystem
>>> - Loading extension file systems via services
>>>
>>> 2020-01-23 22:07:44,016 DEBUG org.apache.flink.core.fs.FileSystem
>>> - Added file system
>>> maprfs:org.apache.flink.runtime.fs.maprfs.MapRFsFactory
>>>
>>> 2020-01-23 22:07:44,045 DEBUG org.apache.flink.runtime.util.HadoopUtils
>>>   - Cannot find hdfs-default configuration-file path in
>>> Flink config.
>>>
>>> 2020-01-23 22:07:44,045 DEBUG org.apache.flink.runtime.util.HadoopUtils
>>>   - Cannot find hdfs-site configuration-file path in
>>> Flink config.
>>>
>>>
>>>
>>>
>>>
>>> *From: *Aaron Langford 
>>> *Date: *Thursday, January 23, 2020 at 12:22 PM
>>> *To: *Senthil Kumar 
>>> *Cc: *Yang Wang , "user@flink.apache.org" <
>>> user@flink.apache.org>
>>> *Subject: *Re: Location of flink-s3-fs-hadoop plugin (Flink 1.9.0 on
>>> EMR)
>>>
>>>
>>>
>>> When creating your cluster, you can provide configurations that EMR will
>>> find the right home for. Example for the aws cli:
>>>
>>>
>>>
>>> aws emr create-cluster ... --configurations '[{
>>> "Classification": "flink-log4j",
>>> "Properties": {
>>>   "log4j.rootLogger": "DEBUG,file"
>>> }
>>>   },{
>>> "Classification": "flink-log4j-yarn-session",
>>> "Properties": {
>>>   "log4j.rootLogger": "DEBUG,stdout"
>>>   }]'
>>>
>>>
>>>
>>> If you can't take down your existing EMR cluster for some reason, you
>>> can ask AWS to modify these configurations for you on the cluster. They
>>> should take effect when you start a new Flink job (new job manager as well
>>> as a new job in that job manager). It is my understanding that
>>> configuration changes require a restart of a flink jobmanager + topology in
>>> order to take effect. Here's an example of how to modify an existing
>>> cluster (I just threw this together, so beware malformed JSON):
>>>
>>>
>>>
>>> aws emr modify-instance-groups --cli-input-json '{
>>> "ClusterId": "",
>>> "InstanceGroups": [{
>>> "InstanceGroupId": "",
>>> "Configurations": [{
>>> "Classification": "flink-log4j",
>>> "Properties": {
>>> "log4j.rootLogger": "DEBUG,file"
>>> }
>>> },{
>>> "Classification": "flink-log4j-yarn-session",
>>> "Properties": {
>>> "log4j.rootLogger": "DEBUG,stdout"
>>> }
>>> }]
>>> },{
>>> "InstanceGroupId": "",
>>> "Configurations": [{
>>> "Classification": "flink-log4j",
>>> "Properties": {
>>> "log4j.rootLogger": "DEBUG,file"
>>> 

Re: Location of flink-s3-fs-hadoop plugin (Flink 1.9.0 on EMR)

2020-01-28 Thread Arvid Heise
Hi Aaron,

I encountered a similar issue when running on EMR. On the slaves, there are
some lingering hadoop versions that are older than 2.7 (it was 2.6 if I
remember correctly), which bleed into the classpath of Flink.
Flink checks the Hadoop version to check if certain capabilities like file
truncations are available or not. In your case, it loads the version info
of the old jar first and determines that file truncation is not available.

I'm currently working on a fix [1]. Then, Flink will only check the Hadoop
version of the Hadoop that is bundled in the plugin and not the one first
in the classpath.

As a workaround, you could try the following options:
- Don't use s3 as a plugin but put the jar in lib/ (that's not working
anymore in Flink 1.10+ though)
- Or connect to your slaves find the old hadoop-common*.jar and remove it
manually. The location of all relevant hadoop-common*.jar should be visible
in the task manager logs. If they have the version number >2.7 they are
good, all other may result in issues.

Ping me if you need further assistance.

[1] https://issues.apache.org/jira/browse/FLINK-15777

On Fri, Jan 24, 2020 at 5:30 PM Aaron Langford 
wrote:

> This seems to confirm that the S3 file system implementation is not being
> loaded when you start your job.
>
> Can you share the details of how you are getting the flink-s3-fs-hadoop 
> artifact
> onto your cluster? Are you simply ssh-ing to the master node and doing this
> manually? Are you doing this via a bootstrap action? Timing of this
> action would be relevant as well.
>
> Aaron
>
> On Fri, Jan 24, 2020 at 8:12 AM Senthil Kumar 
> wrote:
>
>> Thanks, here’s the debug output. It looks like we need to setup
>> hdfs-config file in the flink config.
>>
>> Could you advise us further?
>>
>>
>>
>> --
>>
>>
>>
>> 2020-01-23 22:07:44,014 DEBUG org.apache.flink.core.fs.FileSystem
>> - Loading extension file systems via services
>>
>> 2020-01-23 22:07:44,016 DEBUG org.apache.flink.core.fs.FileSystem
>> - Added file system
>> maprfs:org.apache.flink.runtime.fs.maprfs.MapRFsFactory
>>
>> 2020-01-23 22:07:44,045 DEBUG org.apache.flink.runtime.util.HadoopUtils
>>   - Cannot find hdfs-default configuration-file path in
>> Flink config.
>>
>> 2020-01-23 22:07:44,045 DEBUG org.apache.flink.runtime.util.HadoopUtils
>>   - Cannot find hdfs-site configuration-file path in
>> Flink config.
>>
>>
>>
>>
>>
>> *From: *Aaron Langford 
>> *Date: *Thursday, January 23, 2020 at 12:22 PM
>> *To: *Senthil Kumar 
>> *Cc: *Yang Wang , "user@flink.apache.org" <
>> user@flink.apache.org>
>> *Subject: *Re: Location of flink-s3-fs-hadoop plugin (Flink 1.9.0 on EMR)
>>
>>
>>
>> When creating your cluster, you can provide configurations that EMR will
>> find the right home for. Example for the aws cli:
>>
>>
>>
>> aws emr create-cluster ... --configurations '[{
>> "Classification": "flink-log4j",
>> "Properties": {
>>   "log4j.rootLogger": "DEBUG,file"
>> }
>>   },{
>> "Classification": "flink-log4j-yarn-session",
>> "Properties": {
>>   "log4j.rootLogger": "DEBUG,stdout"
>>   }]'
>>
>>
>>
>> If you can't take down your existing EMR cluster for some reason, you can
>> ask AWS to modify these configurations for you on the cluster. They should
>> take effect when you start a new Flink job (new job manager as well as a
>> new job in that job manager). It is my understanding that configuration
>> changes require a restart of a flink jobmanager + topology in order to take
>> effect. Here's an example of how to modify an existing cluster (I just
>> threw this together, so beware malformed JSON):
>>
>>
>>
>> aws emr modify-instance-groups --cli-input-json '{
>> "ClusterId": "",
>> "InstanceGroups": [{
>> "InstanceGroupId": "",
>> "Configurations": [{
>> "Classification": "flink-log4j",
>> "Properties": {
>> "log4j.rootLogger": "DEBUG,file"
>> }
>> },{
>> "Classification": "flink-log4j-yarn-session",
>> "Properties": {
>> "log4j.rootLogger": "DEBUG,stdout"
>> }
>> }]
>> },{
>> "InstanceGroupId": "",
>> "Configurations": [{
>> "Classification": "flink-log4j",
>> "Properties": {
>> "log4j.rootLogger": "DEBUG,file"
>> }
>> },{
>> "Classification": "flink-log4j-yarn-session",
>> "Properties": {
>> "log4j.rootLogger": "DEBUG,stdout"
>> }
>> }]
>>  }]
>> }'
>>
>>
>>
>> On Thu, Jan 23, 2020 at 11:03 AM Senthil Kumar 
>> wrote:
>>
>> Could you tell us how to turn on debug level logs?
>>
>>
>>
>> We attempted this (on driver)
>>
>>
>>
>> sudo stop hadoop-yarn-resourcemanager
>>
>>
>>
>> followed the instructions here
>>
>>
>> https://stackoverflow.com/questions/27853974/how-to-set-debug-log-level-for-resourcem

Re: How to debug a job stuck in a deployment/run loop?

2020-01-28 Thread Arvid Heise
Hi Jason,

could you describe your topology? Are you writing to Kafka? Are you using
exactly once? Are you seeing any warning?
If so, one thing that immediately comes to my mind is
transaction.max.timeout.ms. If the value in flink (by default 1h) is higher
than what the Kafka brokers support, it may run into indefinite restart
loops in rare cases.

"Kafka brokers by default have transaction.max.timeout.ms set to 15
minutes. This property will not allow to set transaction timeouts for the
producers larger than it’s value. FlinkKafkaProducer011 by default sets the
transaction.timeout.ms property in producer config to 1 hour, thus
transaction.max.timeout.ms should be increased before using the
Semantic.EXACTLY_ONCE mode."

Best,

Arvid

On Fri, Jan 24, 2020 at 2:47 AM Jason Kania  wrote:

> I am attempting to migrate from 1.7.1 to 1.9.1 and I have hit a problem
> where previously working jobs can no longer launch after being submitted.
> In the UI, the submitted jobs show up as deploying for a period, then go
> into a run state before returning to the deploy state and this repeats
> regularly with the job bouncing between states. No exceptions or errors are
> visible in the logs. There is no data coming in for the job to process and
> the kafka queues are empty.
>
> If I look at the thread activity of the task manager running the job in
> top, I see that the busiest threads are flink-akka threads, sometimes
> jumping to very high CPU numbers. That is all I have for info.
>
> Any suggestions on how to debug this? I can set break points and connect
> if that helps, just not sure at this point where to start.
>
> Thanks,
>
> Jason
>


Re: PostgreSQL JDBC connection drops after inserting some records

2020-01-28 Thread Arvid Heise
Hi Soheil,

what is your actual question? Did the application eventually finish or does
it keep restarting?

In general, communication with external systems may fail from time to time.
Only if it persists, we would explore it. If it is very rare, a restart
should already help.

Best,

Arvid

On Thu, Jan 23, 2020 at 5:35 PM Soheil Pourbafrani 
wrote:

> Hi,
> I have a peace of Flink Streaming code that reads data from files and
> inserts them into the PostgreSQL table. After inserting 6 to 11 million
> records, I got the following errors:
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *Caused by: java.lang.RuntimeException: Execution of JDBC statement
> failed. at
> org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.flush(JDBCOutputFormat.java:219)
> at
> org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.writeRecord(JDBCOutputFormat.java:210)
> at
> org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.writeRecord(JDBCOutputFormat.java:41)
> at
> org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.invoke(OutputFormatSinkFunction.java:86)
> at
> org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
> at
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
> ... 15 moreCaused by: java.sql.BatchUpdateException: Batch entry 0 INSERT
> INTO csv_data(asset, tag, t, q, v, backfill, createdAt, createdBy) VALUES
> ('SST', 'XC_XC', '2015-04-11 21:36:23+03', 12.0, '1.00', 'FALSE',
> '2020-01-23 19:22:14.469+03', 'system') ON CONFLICT DO NOTHING was aborted:
> An I/O error occurred while sending to the backend.  Call getNextException
> to see other errors in the batch. at
> org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:148)
> at
> org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:515)
> at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:853) at
> org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1546)
> at
> org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.flush(JDBCOutputFormat.java:216)
> ... 21 moreCaused by: org.postgresql.util.PSQLException: An I/O error
> occurred while sending to the backend. at
> org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:516)
> ... 24 moreCaused by: java.io.EOFException at
> org.postgresql.core.PGStream.receiveChar(PGStream.java:337) at
> org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2000)*
> at
> org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:510)
> ... 24 more
>
> However as I enabled the Restart Strategy, the app will automatically be
> restarted and reconnect to the database.
> My code simply reads data from files and after transforming them into the
> table schema, insert the rows into the table.
>
> It would be great if anyone can help me with this
> Thanks
>