Re: Flink cluster on k8s with rocksdb state backend

2019-10-17 Thread Steven Nelson
You may want to look at using instances with local ssd drives. You don’t really 
need to keep the state data between instance stops and starts, because Flink 
will have to restore from a checkpoint or savepoint, so using ephemeral isn’t a 
problem.

Sent from my iPhone

> On Oct 17, 2019, at 11:31 PM, dhanesh arole  wrote:
> 
> Hello all, 
> 
> I am trying to provision a Flink cluster on k8s. Some of the jobs in our 
> existing cluster use RocksDB state backend. I wanted to take a look at the 
> Flink helm chart or deployment manifests that provision task managers with 
> dynamic PV and how they manage it. We are running on kops managed k8s cluster 
> on AWS (!EKS). Also, some pointers on expected pain points, surprises, 
> monitoring strategies would be really helpful. 
> 
> 
> Thanks & Regards
> -
> Dhanesh
> 


Flink cluster on k8s with rocksdb state backend

2019-10-17 Thread dhanesh arole
Hello all,

I am trying to provision a Flink cluster on k8s. Some of the jobs in our
existing cluster use RocksDB state backend. I wanted to take a look at the
Flink helm chart or deployment manifests that provision task managers with
dynamic PV and how they manage it. We are running on kops managed k8s
cluster on AWS (!EKS). Also, some pointers on expected pain points,
surprises, monitoring strategies would be really helpful.


Thanks & Regards
-
Dhanesh


Re: JDBC Table Sink doesn't seem to sink to database.

2019-10-17 Thread Rong Rong
Splendid. Thanks for following up and moving the discussion forward :-)

--
Rong

On Thu, Oct 17, 2019 at 11:38 AM John Smith  wrote:

> I recorded two:
> Time interval: https://issues.apache.org/jira/browse/FLINK-14442
> Checkpointing: https://issues.apache.org/jira/browse/FLINK-14443
>
>
> On Thu, 17 Oct 2019 at 14:00, Rong Rong  wrote:
>
>> Yes, I think having a time interval execution (for the AppendableSink)
>> should be a good idea.
>> Can you please open a Jira issue[1] for further discussion.
>>
>> --
>> Rong
>>
>> [1] https://issues.apache.org/jira/projects/FLINK/issues
>>
>> On Thu, Oct 17, 2019 at 9:48 AM John Smith 
>> wrote:
>>
>>> Yes correct, I set it to batch interval = 1 and it works fine. Anyways I
>>> think the JDBC sink could have some improvements like batchInterval + time
>>> interval execution. So if the batch doesn't fill up then execute what ever
>>> is left on that time interval.
>>>
>>> On Thu, 17 Oct 2019 at 12:22, Rong Rong  wrote:
>>>
 Hi John,

 You are right. IMO the batch interval setting is used for increasing
 the JDBC execution performance purpose.
 The reason why your INSERT INTO statement with a `non_existing_table`
 the exception doesn't happen is because the JDBCAppendableSink does not
 check table existence beforehand. That being said it should fail at the
 first batch execution.

 Also I think the `batchInterval` setting is local to the task , this
 means the default 5000 batchInterval is per-partition.

 --
 Rong

 On Wed, Oct 16, 2019 at 7:21 AM John Smith 
 wrote:

> Ok I think I found it. it's the batch interval setting. From what I
> see, if we want "realtime" stream to the database we have to set it to 1
> other wise the sink will wait until, the batch interval count is reached.
>
> The batch interval mechanism doesn't see correct? If the default size
> is 5000 and you need to insert 5001 you will never get that 1 record?
>
> On Tue, 15 Oct 2019 at 15:54, John Smith 
> wrote:
>
>> Hi, using 1.8.0
>>
>> I have the following job: https://pastebin.com/ibZUE8Qx
>>
>> So the job does the following steps...
>> 1- Consume from Kafka and return JsonObject
>> 2- Map JsonObject to MyPojo
>> 3- Convert The stream to a table
>> 4- Insert the table to JDBC sink table
>> 5- Print the table.
>>
>> - The job seems to work with no errors and I can see the row print to
>> the console and I see nothing in my database.
>> - If I put invalid host for the database and restart the job, I get a
>> connection SQLException error. So at least we know that works.
>> - If I make a typo on the INSERT INTO statement like INSERTS INTO
>> non_existing_table, there are no exceptions thrown, the print happens, 
>> the
>> stream continues to work.
>> - If I drop the table from the database, same thing, no exceptions
>> thrown, the print happens, the stream continues to work.
>>
>> So am I missing something?
>>
>


Re: Flink State Migration Version 1.8.2

2019-10-17 Thread Paul Lam
Hi,

Could you confirm that you’re using POJOSerializer before and after migration? 

Best,
Paul Lam

> 在 2019年10月17日,21:34,ApoorvK  写道:
> 
> It is throwing below error ,
> the class I am adding variables have other variable as an object of class
> which are also in state.
> 
> Caused by: org.apache.flink.util.StateMigrationException: The new state
> typeSerializer for operator state must not be incompatible.
>   at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getListState(DefaultOperatorStateBackend.java:323)
>   at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getListState(DefaultOperatorStateBackend.java:214)
>   at
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.initializeState(AsyncWaitOperator.java:268)
>   at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
>   at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:740)
>   at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:291)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>   at java.lang.Thread.run(Thread.java:748)
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Data processing with HDFS local or remote

2019-10-17 Thread Pritam Sadhukhan
Hi,

I am trying to process data stored on HDFS using flink batch jobs.
Our data is splitted into 16 data nodes.

I am curious to know how data will be pulled from the data nodes with the
same number of parallelism set as the data split on HDFS i.e. 16.

Is the flink task being executed locally on the data node server or it will
happen in the flink nodes where data will be pulled remotely?

Any help will be appreciated.

Regards,
Pritam.


ProcessFunction Timer

2019-10-17 Thread Navneeth Krishnan
Hi All,

I'm currently using a tumbling window of 5 seconds using TumblingTimeWindow
but due to change in requirements I would not have to window every incoming
data. With that said I'm planning to use process function to achieve this
selective windowing.

I looked at the example provided in the documentation and I'm not clear on
how I can implement the windowing.
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html

Basically what I want is keep collecting data until it reaches 5 seconds
from the time the first data came in for the key and then forward it to the
next operator. I will be using ListState to add the entries and then
register a timer when the list is empty. When the timer runs then collect
all entries and forward it, also remove entries from the list. Do you guys
think this will suffice or anything else has to be done?

Also I will have about 1M keys, then would there be any performance impact
in creating these many timers? I believe the timers are automatically
removed after they are fired or should I do anything extra to remove these
timers?

Thanks


Re: Broadcast state

2019-10-17 Thread Navneeth Krishnan
Ya, there will not be a problem of duplicates. But what I'm trying to
achieve is if there a large static state which needs to be present just one
per node rather than storing it per slot that would be ideal. The reason
being is that the state is quite large around 100GB of mostly static data
and it is not needed at per slot level. It can be at per instance level
where each slot can read from this shared memory.

Thanks

On Wed, Oct 9, 2019 at 12:13 AM Congxian Qiu  wrote:

> Hi,
>
> After using Redis, why there need to care about eliminate duplicated data,
> if you specify the same key, then Redis will do the deduplicate things.
>
> Best,
> Congxian
>
>
> Fabian Hueske  于2019年10月2日周三 下午5:30写道:
>
>> Hi,
>>
>> State is always associated with a single task in Flink.
>> The state of a task cannot be accessed by other tasks of the same
>> operator or tasks of other operators.
>> This is true for every type of state, including broadcast state.
>>
>> Best, Fabian
>>
>>
>> Am Di., 1. Okt. 2019 um 08:22 Uhr schrieb Navneeth Krishnan <
>> reachnavnee...@gmail.com>:
>>
>>> Hi,
>>>
>>> I can use redis but I’m still having hard time figuring out how I can
>>> eliminate duplicate data. Today without broadcast state in 1.4 I’m using
>>> cache to lazy load the data. I thought the broadcast state will be similar
>>> to that of kafka streams where I have read access to the state across the
>>> pipeline. That will indeed solve a lot of problems. Is there some way I can
>>> do the same with flink?
>>>
>>> Thanks!
>>>
>>> On Mon, Sep 30, 2019 at 10:36 PM Congxian Qiu 
>>> wrote:
>>>
 Hi,

 Could you use some cache system such as HBase or Reids to storage this
 data, and query from the cache if needed?

 Best,
 Congxian


 Navneeth Krishnan  于2019年10月1日周二 上午10:15写道:

> Thanks Oytun. The problem with doing that is the same data will be
> have to be stored multiple times wasting memory. In my case there will
> around million entries which needs to be used by at least two operators 
> for
> now.
>
> Thanks
>
> On Mon, Sep 30, 2019 at 5:42 PM Oytun Tez  wrote:
>
>> This is how we currently use broadcast state. Our states are
>> re-usable (code-wise), every operator that wants to consume basically 
>> keeps
>> the same descriptor state locally by processBroadcastElement'ing into a
>> local state.
>>
>> I am open to suggestions. I see this as a hard drawback of dataflow
>> programming or Flink framework?
>>
>>
>>
>> ---
>> Oytun Tez
>>
>> *M O T A W O R D*
>> The World's Fastest Human Translation Platform.
>> oy...@motaword.com — www.motaword.com
>>
>>
>> On Mon, Sep 30, 2019 at 8:40 PM Oytun Tez  wrote:
>>
>>> You can re-use the broadcasted state (along with its descriptor)
>>> that comes into your KeyedBroadcastProcessFunction, in another operator
>>> downstream. that's basically duplicating the broadcasted state whichever
>>> operator you want to use, every time.
>>>
>>>
>>>
>>> ---
>>> Oytun Tez
>>>
>>> *M O T A W O R D*
>>> The World's Fastest Human Translation Platform.
>>> oy...@motaword.com — www.motaword.com
>>>
>>>
>>> On Mon, Sep 30, 2019 at 8:29 PM Navneeth Krishnan <
>>> reachnavnee...@gmail.com> wrote:
>>>
 Hi All,

 Is it possible to access a broadcast state across the pipeline? For
 example, say I have a KeyedBroadcastProcessFunction which adds the 
 incoming
 data to state and I have downstream operator where I need the same 
 state as
 well, would I be able to just read the broadcast state with a readonly
 view. I know this is possible in kafka streams.

 Thanks

>>>


Re: Should I use static database connection pool?

2019-10-17 Thread John Smith
Also pool should be transient because it it holds connections which
shouldn't/cannot be serialized.

On Thu., Oct. 17, 2019, 9:39 a.m. John Smith, 
wrote:

> If by task you mean job then yes global static variables initialized im
> the main of the job do not get serialized/transfered to the nodes where
> that job may get assigned.
>
> The other thing is also since it is a sink, the sink will be serialized to
> that node and then initialized so that static variable will be local to
> that sink.
>
> Someone from flink should chime in :p
>
> On Thu., Oct. 17, 2019, 9:22 a.m. John Smith, 
> wrote:
>
>> Usually the database connection pool is thread safe. When you mean task
>> you mean a single deployed flink job?
>>
>> I still think a sink is only init once. You can prove it by putting
>> logging in the open and close.
>>
>> On Thu., Oct. 17, 2019, 1:39 a.m. Xin Ma,  wrote:
>>
>>> Thanks, John. If I don't static my pool, I think it will create one
>>> instance for each task. If the pool is static, each jvm can hold one
>>> instance. Depending on the deployment approach, it can create one to
>>> multiple instances. Is this correct?
>>> Konstantin's talk mentions static variables can lead to dead locks, etc,
>>> I don't know if the loss of jdbc connection is also related to this. Btw, I
>>> am using JDBC to write to HBase, maybe it also matters.
>>>
>>>
>>> On Thu, Oct 17, 2019 at 2:32 AM John Smith 
>>> wrote:
>>>
 Xin. The open() close() cycle of a Sink function is only called once so
 I don't think you event need to have it static your pool. Someone can
 confirm this?

 Miki the JDBC Connector lacks some functionality for instance it only
 flushes batches when the batch interval is reached. So if you set batch
 interval to 5 and you get 6 records the 6 one will not be flushed to the DB
 until you get another 4. You can see in the code above Xin has put a timer
 based flush as well. Also JDBC connector does not have checkpointing if you
 ever need that, which is a surprise because most JDBC databases have
 transactions so it would be nice to have.

 On Wed, 16 Oct 2019 at 10:58, miki haiat  wrote:

> If it's a sink that use jdbc, why not using the flink Jdbcsink
> connector?
>
>
> On Wed, Oct 16, 2019, 17:03 Xin Ma  wrote:
>
>> I have watched one of the recent Flink forward videos, Apache Flink
>> Worst Practices by Konstantin Knauf. The talk helps me a lot and mentions
>> that we should avoid using static variables to share state between tasks.
>>
>> So should I also avoid static database connection? Because I am
>> facing a weird issue currently, the database connection will lose at some
>> point and bring the whole job down.
>>
>> *I have created a database tool like this, *
>>
>> public class Phoenix {
>>
>> private static ComboPooledDataSource dataSource = new
>> ComboPooledDataSource();
>> static {
>> try {
>>
>> dataSource.setDriverClass(Environment.get("phoenix.jdbc.driverClassName",
>> "org.apache.phoenix.jdbc.PhoenixDriver"));
>> dataSource.setJdbcUrl(Environment.get("phoenix.jdbc.url",
>> null));
>> dataSource.setMaxPoolSize(200);
>> dataSource.setMinPoolSize(10);
>> Properties properties = new Properties();
>> properties.setProperty("user", "---");
>> properties.setProperty("password", "---");
>> dataSource.setProperties(properties);
>> } catch (PropertyVetoException e) {
>> throw new RuntimeException("phoenix datasource conf
>> error");
>> }
>> }
>>
>> private static Connection getConn() throws SQLException {
>> return dataSource.getConnection();
>> }
>>
>> public static < T > T executeQuery(String sql, Caller < T >
>> caller) throws SQLException {
>> // .. execiton logic
>> }
>>
>> public static int executeUpdateWithTx(List < String > sqlList)
>> throws SQLException {
>> // ..update logic
>> }
>>
>> }
>>
>> *Then I implemented my customized sink function like this,*
>>
>> public class CustomizedSink extends RichSinkFunction < Record > {
>>
>> private static Logger LOG =
>> LoggerFactory.getLogger("userFlinkLogger");
>> private static final int batchInsertSize = 5000;
>> private static final long flushInterval = 60 * 1000 L;
>> private long lastFlushTime;
>> private BatchCommit batchCommit;
>> private ConcurrentLinkedQueue < Object > cacheQueue;
>> private ExecutorService threadPool;
>>
>> @Override
>> public void open(Configuration parameters) throws Exception {
>> cacheQueue = new ConcurrentLinkedQueue < > ();
>> threadPool = Exec

Managing Job Deployments in Production

2019-10-17 Thread Peter Groesbeck
How are folks here managing deployments in production?

We are deploying Flink jobs on EMR manually at the moment but would like to
move towards some form of automation before anything goes into production.
Adding additional EMR Steps to a long running cluster to deploy or update
jobs seems like the most accessible method, however getting feedback output
(i.e. listing currently running jobs, detecting transient failures) is
lacking.

I've briefly looked into Ververica platform which is an ideal solution
except for the licensing aspect. Ideally I'd like to work with something
open source. A UI is not necessary.

Has anyone tried ING-Banks open source tool?
https://github.com/ing-bank/flink-deployer

Any and all ideas are welcome, thanks!


Re: JDBC Table Sink doesn't seem to sink to database.

2019-10-17 Thread John Smith
I recorded two:
Time interval: https://issues.apache.org/jira/browse/FLINK-14442
Checkpointing: https://issues.apache.org/jira/browse/FLINK-14443


On Thu, 17 Oct 2019 at 14:00, Rong Rong  wrote:

> Yes, I think having a time interval execution (for the AppendableSink)
> should be a good idea.
> Can you please open a Jira issue[1] for further discussion.
>
> --
> Rong
>
> [1] https://issues.apache.org/jira/projects/FLINK/issues
>
> On Thu, Oct 17, 2019 at 9:48 AM John Smith  wrote:
>
>> Yes correct, I set it to batch interval = 1 and it works fine. Anyways I
>> think the JDBC sink could have some improvements like batchInterval + time
>> interval execution. So if the batch doesn't fill up then execute what ever
>> is left on that time interval.
>>
>> On Thu, 17 Oct 2019 at 12:22, Rong Rong  wrote:
>>
>>> Hi John,
>>>
>>> You are right. IMO the batch interval setting is used for increasing the
>>> JDBC execution performance purpose.
>>> The reason why your INSERT INTO statement with a `non_existing_table`
>>> the exception doesn't happen is because the JDBCAppendableSink does not
>>> check table existence beforehand. That being said it should fail at the
>>> first batch execution.
>>>
>>> Also I think the `batchInterval` setting is local to the task , this
>>> means the default 5000 batchInterval is per-partition.
>>>
>>> --
>>> Rong
>>>
>>> On Wed, Oct 16, 2019 at 7:21 AM John Smith 
>>> wrote:
>>>
 Ok I think I found it. it's the batch interval setting. From what I
 see, if we want "realtime" stream to the database we have to set it to 1
 other wise the sink will wait until, the batch interval count is reached.

 The batch interval mechanism doesn't see correct? If the default size
 is 5000 and you need to insert 5001 you will never get that 1 record?

 On Tue, 15 Oct 2019 at 15:54, John Smith 
 wrote:

> Hi, using 1.8.0
>
> I have the following job: https://pastebin.com/ibZUE8Qx
>
> So the job does the following steps...
> 1- Consume from Kafka and return JsonObject
> 2- Map JsonObject to MyPojo
> 3- Convert The stream to a table
> 4- Insert the table to JDBC sink table
> 5- Print the table.
>
> - The job seems to work with no errors and I can see the row print to
> the console and I see nothing in my database.
> - If I put invalid host for the database and restart the job, I get a
> connection SQLException error. So at least we know that works.
> - If I make a typo on the INSERT INTO statement like INSERTS INTO
> non_existing_table, there are no exceptions thrown, the print happens, the
> stream continues to work.
> - If I drop the table from the database, same thing, no exceptions
> thrown, the print happens, the stream continues to work.
>
> So am I missing something?
>



Re: Mirror Maker 2.0 cluster and starting from latest offset and other queries

2019-10-17 Thread Vishal Santoshi
oh shit.. sorry wrong wrong forum :)

On Thu, Oct 17, 2019 at 1:41 PM Piotr Nowojski  wrote:

> Hi,
>
> It sounds like setting up Mirror Maker has very little to do with Flink.
> Shouldn’t you try asking this question on the Kafka mailing list?
>
> Piotrek
>
> On 16 Oct 2019, at 16:06, Vishal Santoshi 
> wrote:
>
> 1.  still no clue, apart from the fact that ConsumerConfig gets it from
> somewhere ( need to override it and have tried both auto.offset.reset
> =latest and  consumer.auto.offset.reset = latest
>
> [2019-10-16 13:50:34,260] INFO ConsumerConfig values:
> allow.auto.create.topics = true
> auto.commit.interval.ms = 5000
> auto.offset.reset = earliest
>
> 2. sync.topic.acls.enabled = false
>
>
>
>
> On Tue, Oct 15, 2019 at 4:00 PM Vishal Santoshi 
> wrote:
>
>> 2 queries
>>
>> 1.   I am trying to configure MM2 to start replicating from the head (
>> latest of the topic ) .  Should  auto.offset.reset = latest in
>> mm2.properties be enough ? Unfortunately MM2 will start from the EARLIEST.
>>
>> 2. I  do not have "Authorizer is configured on the broker "  and see
>> this exception java.util.concurrent.ExecutionException:
>> org.apache.kafka.common.errors.SecurityDisabledException: No Authorizer is
>> configured on the broker From
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0#KIP-382:MirrorMaker2.0-Config,ACLSync
>> *sync.topic.acls = false* should do it but does not..
>>
>>
>> Any ideas?
>>
>>
>>
>


Re: JDBC Table Sink doesn't seem to sink to database.

2019-10-17 Thread Rong Rong
Yes, I think having a time interval execution (for the AppendableSink)
should be a good idea.
Can you please open a Jira issue[1] for further discussion.

--
Rong

[1] https://issues.apache.org/jira/projects/FLINK/issues

On Thu, Oct 17, 2019 at 9:48 AM John Smith  wrote:

> Yes correct, I set it to batch interval = 1 and it works fine. Anyways I
> think the JDBC sink could have some improvements like batchInterval + time
> interval execution. So if the batch doesn't fill up then execute what ever
> is left on that time interval.
>
> On Thu, 17 Oct 2019 at 12:22, Rong Rong  wrote:
>
>> Hi John,
>>
>> You are right. IMO the batch interval setting is used for increasing the
>> JDBC execution performance purpose.
>> The reason why your INSERT INTO statement with a `non_existing_table` the
>> exception doesn't happen is because the JDBCAppendableSink does not check
>> table existence beforehand. That being said it should fail at the first
>> batch execution.
>>
>> Also I think the `batchInterval` setting is local to the task , this
>> means the default 5000 batchInterval is per-partition.
>>
>> --
>> Rong
>>
>> On Wed, Oct 16, 2019 at 7:21 AM John Smith 
>> wrote:
>>
>>> Ok I think I found it. it's the batch interval setting. From what I see,
>>> if we want "realtime" stream to the database we have to set it to 1 other
>>> wise the sink will wait until, the batch interval count is reached.
>>>
>>> The batch interval mechanism doesn't see correct? If the default size is
>>> 5000 and you need to insert 5001 you will never get that 1 record?
>>>
>>> On Tue, 15 Oct 2019 at 15:54, John Smith  wrote:
>>>
 Hi, using 1.8.0

 I have the following job: https://pastebin.com/ibZUE8Qx

 So the job does the following steps...
 1- Consume from Kafka and return JsonObject
 2- Map JsonObject to MyPojo
 3- Convert The stream to a table
 4- Insert the table to JDBC sink table
 5- Print the table.

 - The job seems to work with no errors and I can see the row print to
 the console and I see nothing in my database.
 - If I put invalid host for the database and restart the job, I get a
 connection SQLException error. So at least we know that works.
 - If I make a typo on the INSERT INTO statement like INSERTS INTO
 non_existing_table, there are no exceptions thrown, the print happens, the
 stream continues to work.
 - If I drop the table from the database, same thing, no exceptions
 thrown, the print happens, the stream continues to work.

 So am I missing something?

>>>


Re: Elasticsearch6UpsertTableSink how to trigger es delete index。

2019-10-17 Thread Piotr Nowojski
Hi, 

Take a look at the documentation. This [1] describes an example were a running 
query can produce updated results (and thus retracting the previous results).

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/dynamic_tables.html#table-to-stream-conversion
 


Piotrek

> On 16 Oct 2019, at 09:25, ouywl  wrote:
> 
> 
> Hi,
> When I use Elasticsearch6UpsertTableSink, and It seems implements delete 
> index. Like code:
>  @Override
> public void process(Tuple2 element, RuntimeContext ctx, 
> RequestIndexer indexer) {
>if (element.f0) {
>   processUpsert(element.f1, indexer);
>} else {
>   processDelete(element.f1, indexer);
>}
> }
> I don’t  which condition can trigger element.f0 == false and delete es index 
> call processDelete() 
>  
> 
> 
>   
> ouywl
> ou...@139.com
>  
> 
> 签名由 网易邮箱大师  定制
> 
> 



Re: Mirror Maker 2.0 cluster and starting from latest offset and other queries

2019-10-17 Thread Piotr Nowojski
Hi,

It sounds like setting up Mirror Maker has very little to do with Flink. 
Shouldn’t you try asking this question on the Kafka mailing list?

Piotrek

> On 16 Oct 2019, at 16:06, Vishal Santoshi  wrote:
> 
> 1.  still no clue, apart from the fact that ConsumerConfig gets it from 
> somewhere ( need to override it and have tried both auto.offset.reset =latest 
> and  consumer.auto.offset.reset = latest
> 
> [2019-10-16 13:50:34,260] INFO ConsumerConfig values: 
>   allow.auto.create.topics = true
>   auto.commit.interval.ms  = 5000
>   auto.offset.reset = earliest
> 
> 2. sync.topic.acls.enabled = false 
> 
> 
> 
> 
> On Tue, Oct 15, 2019 at 4:00 PM Vishal Santoshi  > wrote:
> 2 queries
> 
> 1.   I am trying to configure MM2 to start replicating from the head ( latest 
> of the topic ) .  Should  auto.offset.reset = latest in mm2.properties be 
> enough ? Unfortunately MM2 will start from the EARLIEST.
> 
> 2. I  do not have "Authorizer is configured on the broker "  and see this 
> exception java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.SecurityDisabledException: No Authorizer is 
> configured on the broker From 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0#KIP-382:MirrorMaker2.0-Config,ACLSync
>  
> 
> sync.topic.acls = false should do it but does not..
> 
> 
> Any ideas? 
> 
> 



Re: JDBC Table Sink doesn't seem to sink to database.

2019-10-17 Thread John Smith
Yes correct, I set it to batch interval = 1 and it works fine. Anyways I
think the JDBC sink could have some improvements like batchInterval + time
interval execution. So if the batch doesn't fill up then execute what ever
is left on that time interval.

On Thu, 17 Oct 2019 at 12:22, Rong Rong  wrote:

> Hi John,
>
> You are right. IMO the batch interval setting is used for increasing the
> JDBC execution performance purpose.
> The reason why your INSERT INTO statement with a `non_existing_table` the
> exception doesn't happen is because the JDBCAppendableSink does not check
> table existence beforehand. That being said it should fail at the first
> batch execution.
>
> Also I think the `batchInterval` setting is local to the task , this means
> the default 5000 batchInterval is per-partition.
>
> --
> Rong
>
> On Wed, Oct 16, 2019 at 7:21 AM John Smith  wrote:
>
>> Ok I think I found it. it's the batch interval setting. From what I see,
>> if we want "realtime" stream to the database we have to set it to 1 other
>> wise the sink will wait until, the batch interval count is reached.
>>
>> The batch interval mechanism doesn't see correct? If the default size is
>> 5000 and you need to insert 5001 you will never get that 1 record?
>>
>> On Tue, 15 Oct 2019 at 15:54, John Smith  wrote:
>>
>>> Hi, using 1.8.0
>>>
>>> I have the following job: https://pastebin.com/ibZUE8Qx
>>>
>>> So the job does the following steps...
>>> 1- Consume from Kafka and return JsonObject
>>> 2- Map JsonObject to MyPojo
>>> 3- Convert The stream to a table
>>> 4- Insert the table to JDBC sink table
>>> 5- Print the table.
>>>
>>> - The job seems to work with no errors and I can see the row print to
>>> the console and I see nothing in my database.
>>> - If I put invalid host for the database and restart the job, I get a
>>> connection SQLException error. So at least we know that works.
>>> - If I make a typo on the INSERT INTO statement like INSERTS INTO
>>> non_existing_table, there are no exceptions thrown, the print happens, the
>>> stream continues to work.
>>> - If I drop the table from the database, same thing, no exceptions
>>> thrown, the print happens, the stream continues to work.
>>>
>>> So am I missing something?
>>>
>>


currentWatermark for Event Time is not increasing fast enough to go past the window.maxTimestamp

2019-10-17 Thread Vijay Balakrishnan
Hi,
*Event Time Window: 15s*
My currentWatermark for Event Time processing is not increasing fast enough
to go past the window maxTimestamp.
I have reduced *bound* used for watermark calculation to just *10 ms*.
I have increased the parallelInput to process input from Kinesis in
parallel to 2 slots on my laptop.//env.addSource(kinesisConsumer)
.setParallelism(2);
For FlinkKinesisConsumer, I added a property from flink-1.8.0,
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.
*SHARD_IDLE_INTERVAL_MILLIS*, 25);//this didn't seem to help

//in *EventTimeTrigger*.java: if (window.maxTimestamp() <=
ctx.getCurrentWatermark()) Trigger.FIRE;
My event producer to Kinesis is producing at a delay of 2500 ms for each
record.(business requirement).
What else can I do to consume data from Kinesis faster and cross the
threshold for
currentWatermark to increase beyond the window.maxTimestamp faster ?

*MonitoringTSWAssigner* code:
public class MonitoringTSWAssigner implements
AssignerWithPunctuatedWatermarks> {
private long bound = 5 * (long) 1000;//5 secs out of order bound in
millisecs
private long maxTimestamp = Long.MIN_VALUE;

public MonitoringTSWAssigner() {
}

public MonitoringTSWAssigner(long bound) {
this.bound = bound;
}

public long extractTimestamp(Map monitoring, long
previousTS) {
long extractedTS = getExtractedTS(monitoring);
if (extractedTS > maxTimestamp) {
maxTimestamp = extractedTS;
}
   return extractedTS;
//return System.currentTimeMillis();
}

public long getExtractedTS(Map monitoring) {
final String eventTimestamp = monitoring.get(Utils.EVENT_TIMESTAMP)
!= null ? (String) monitoring.get(Utils.EVENT_TIMESTAMP) : "";
return Utils.getLongFromDateStr(eventTimestamp);
}

@Override
public Watermark checkAndGetNextWatermark(Map
monitoring, long extractedTimestamp) {
long extractedTS = getExtractedTS(monitoring);
long nextWatermark = extractedTimestamp - *bound*;
return new Watermark(nextWatermark);
}
}

TIA


Re: JDBC Table Sink doesn't seem to sink to database.

2019-10-17 Thread Rong Rong
Hi John,

You are right. IMO the batch interval setting is used for increasing the
JDBC execution performance purpose.
The reason why your INSERT INTO statement with a `non_existing_table` the
exception doesn't happen is because the JDBCAppendableSink does not check
table existence beforehand. That being said it should fail at the first
batch execution.

Also I think the `batchInterval` setting is local to the task , this means
the default 5000 batchInterval is per-partition.

--
Rong

On Wed, Oct 16, 2019 at 7:21 AM John Smith  wrote:

> Ok I think I found it. it's the batch interval setting. From what I see,
> if we want "realtime" stream to the database we have to set it to 1 other
> wise the sink will wait until, the batch interval count is reached.
>
> The batch interval mechanism doesn't see correct? If the default size is
> 5000 and you need to insert 5001 you will never get that 1 record?
>
> On Tue, 15 Oct 2019 at 15:54, John Smith  wrote:
>
>> Hi, using 1.8.0
>>
>> I have the following job: https://pastebin.com/ibZUE8Qx
>>
>> So the job does the following steps...
>> 1- Consume from Kafka and return JsonObject
>> 2- Map JsonObject to MyPojo
>> 3- Convert The stream to a table
>> 4- Insert the table to JDBC sink table
>> 5- Print the table.
>>
>> - The job seems to work with no errors and I can see the row print to the
>> console and I see nothing in my database.
>> - If I put invalid host for the database and restart the job, I get a
>> connection SQLException error. So at least we know that works.
>> - If I make a typo on the INSERT INTO statement like INSERTS INTO
>> non_existing_table, there are no exceptions thrown, the print happens, the
>> stream continues to work.
>> - If I drop the table from the database, same thing, no exceptions
>> thrown, the print happens, the stream continues to work.
>>
>> So am I missing something?
>>
>


Querying nested JSON stream?

2019-10-17 Thread srikanth flink
Hi there,

I'm using Flink SQL clinet to run the jobs for me. My stream is a JSON with
nested objects. Couldn't find much document on querying the nested JSON, so
I had to flatten the JSON and use as:
SELECT `source.ip`, `destination.ip`, `dns.query`, `organization.id`,
`dns.answers.data` FROM source;

Can someone help me with the query, querying nested JSON so I could save
resources running flattening job?


Thanks
Srikanth


Re: Should I use static database connection pool?

2019-10-17 Thread John Smith
If by task you mean job then yes global static variables initialized im the
main of the job do not get serialized/transfered to the nodes where that
job may get assigned.

The other thing is also since it is a sink, the sink will be serialized to
that node and then initialized so that static variable will be local to
that sink.

Someone from flink should chime in :p

On Thu., Oct. 17, 2019, 9:22 a.m. John Smith, 
wrote:

> Usually the database connection pool is thread safe. When you mean task
> you mean a single deployed flink job?
>
> I still think a sink is only init once. You can prove it by putting
> logging in the open and close.
>
> On Thu., Oct. 17, 2019, 1:39 a.m. Xin Ma,  wrote:
>
>> Thanks, John. If I don't static my pool, I think it will create one
>> instance for each task. If the pool is static, each jvm can hold one
>> instance. Depending on the deployment approach, it can create one to
>> multiple instances. Is this correct?
>> Konstantin's talk mentions static variables can lead to dead locks, etc,
>> I don't know if the loss of jdbc connection is also related to this. Btw, I
>> am using JDBC to write to HBase, maybe it also matters.
>>
>>
>> On Thu, Oct 17, 2019 at 2:32 AM John Smith 
>> wrote:
>>
>>> Xin. The open() close() cycle of a Sink function is only called once so
>>> I don't think you event need to have it static your pool. Someone can
>>> confirm this?
>>>
>>> Miki the JDBC Connector lacks some functionality for instance it only
>>> flushes batches when the batch interval is reached. So if you set batch
>>> interval to 5 and you get 6 records the 6 one will not be flushed to the DB
>>> until you get another 4. You can see in the code above Xin has put a timer
>>> based flush as well. Also JDBC connector does not have checkpointing if you
>>> ever need that, which is a surprise because most JDBC databases have
>>> transactions so it would be nice to have.
>>>
>>> On Wed, 16 Oct 2019 at 10:58, miki haiat  wrote:
>>>
 If it's a sink that use jdbc, why not using the flink Jdbcsink
 connector?


 On Wed, Oct 16, 2019, 17:03 Xin Ma  wrote:

> I have watched one of the recent Flink forward videos, Apache Flink
> Worst Practices by Konstantin Knauf. The talk helps me a lot and mentions
> that we should avoid using static variables to share state between tasks.
>
> So should I also avoid static database connection? Because I am facing
> a weird issue currently, the database connection will lose at some point
> and bring the whole job down.
>
> *I have created a database tool like this, *
>
> public class Phoenix {
>
> private static ComboPooledDataSource dataSource = new
> ComboPooledDataSource();
> static {
> try {
>
> dataSource.setDriverClass(Environment.get("phoenix.jdbc.driverClassName",
> "org.apache.phoenix.jdbc.PhoenixDriver"));
> dataSource.setJdbcUrl(Environment.get("phoenix.jdbc.url",
> null));
> dataSource.setMaxPoolSize(200);
> dataSource.setMinPoolSize(10);
> Properties properties = new Properties();
> properties.setProperty("user", "---");
> properties.setProperty("password", "---");
> dataSource.setProperties(properties);
> } catch (PropertyVetoException e) {
> throw new RuntimeException("phoenix datasource conf
> error");
> }
> }
>
> private static Connection getConn() throws SQLException {
> return dataSource.getConnection();
> }
>
> public static < T > T executeQuery(String sql, Caller < T >
> caller) throws SQLException {
> // .. execiton logic
> }
>
> public static int executeUpdateWithTx(List < String > sqlList)
> throws SQLException {
> // ..update logic
> }
>
> }
>
> *Then I implemented my customized sink function like this,*
>
> public class CustomizedSink extends RichSinkFunction < Record > {
>
> private static Logger LOG =
> LoggerFactory.getLogger("userFlinkLogger");
> private static final int batchInsertSize = 5000;
> private static final long flushInterval = 60 * 1000 L;
> private long lastFlushTime;
> private BatchCommit batchCommit;
> private ConcurrentLinkedQueue < Object > cacheQueue;
> private ExecutorService threadPool;
>
> @Override
> public void open(Configuration parameters) throws Exception {
> cacheQueue = new ConcurrentLinkedQueue < > ();
> threadPool = Executors.newFixedThreadPool(1);
> batchCommit = new BatchCommit();
> super.open(parameters);
> }
>
> @Override
> public void invoke(DriverLbs driverLbs) throws Exception {
> cacheQueue.add(driverLbs);
> if (cach

Re: Flink State Migration Version 1.8.2

2019-10-17 Thread ApoorvK
It is throwing below error ,
the class I am adding variables have other variable as an object of class
which are also in state.

Caused by: org.apache.flink.util.StateMigrationException: The new state
typeSerializer for operator state must not be incompatible.
at
org.apache.flink.runtime.state.DefaultOperatorStateBackend.getListState(DefaultOperatorStateBackend.java:323)
at
org.apache.flink.runtime.state.DefaultOperatorStateBackend.getListState(DefaultOperatorStateBackend.java:214)
at
org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.initializeState(AsyncWaitOperator.java:268)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:740)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:291)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Should I use static database connection pool?

2019-10-17 Thread John Smith
Usually the database connection pool is thread safe. When you mean task you
mean a single deployed flink job?

I still think a sink is only init once. You can prove it by putting logging
in the open and close.

On Thu., Oct. 17, 2019, 1:39 a.m. Xin Ma,  wrote:

> Thanks, John. If I don't static my pool, I think it will create one
> instance for each task. If the pool is static, each jvm can hold one
> instance. Depending on the deployment approach, it can create one to
> multiple instances. Is this correct?
> Konstantin's talk mentions static variables can lead to dead locks, etc, I
> don't know if the loss of jdbc connection is also related to this. Btw, I
> am using JDBC to write to HBase, maybe it also matters.
>
>
> On Thu, Oct 17, 2019 at 2:32 AM John Smith  wrote:
>
>> Xin. The open() close() cycle of a Sink function is only called once so I
>> don't think you event need to have it static your pool. Someone can confirm
>> this?
>>
>> Miki the JDBC Connector lacks some functionality for instance it only
>> flushes batches when the batch interval is reached. So if you set batch
>> interval to 5 and you get 6 records the 6 one will not be flushed to the DB
>> until you get another 4. You can see in the code above Xin has put a timer
>> based flush as well. Also JDBC connector does not have checkpointing if you
>> ever need that, which is a surprise because most JDBC databases have
>> transactions so it would be nice to have.
>>
>> On Wed, 16 Oct 2019 at 10:58, miki haiat  wrote:
>>
>>> If it's a sink that use jdbc, why not using the flink Jdbcsink connector?
>>>
>>>
>>> On Wed, Oct 16, 2019, 17:03 Xin Ma  wrote:
>>>
 I have watched one of the recent Flink forward videos, Apache Flink
 Worst Practices by Konstantin Knauf. The talk helps me a lot and mentions
 that we should avoid using static variables to share state between tasks.

 So should I also avoid static database connection? Because I am facing
 a weird issue currently, the database connection will lose at some point
 and bring the whole job down.

 *I have created a database tool like this, *

 public class Phoenix {

 private static ComboPooledDataSource dataSource = new
 ComboPooledDataSource();
 static {
 try {

 dataSource.setDriverClass(Environment.get("phoenix.jdbc.driverClassName",
 "org.apache.phoenix.jdbc.PhoenixDriver"));
 dataSource.setJdbcUrl(Environment.get("phoenix.jdbc.url",
 null));
 dataSource.setMaxPoolSize(200);
 dataSource.setMinPoolSize(10);
 Properties properties = new Properties();
 properties.setProperty("user", "---");
 properties.setProperty("password", "---");
 dataSource.setProperties(properties);
 } catch (PropertyVetoException e) {
 throw new RuntimeException("phoenix datasource conf error");
 }
 }

 private static Connection getConn() throws SQLException {
 return dataSource.getConnection();
 }

 public static < T > T executeQuery(String sql, Caller < T > caller)
 throws SQLException {
 // .. execiton logic
 }

 public static int executeUpdateWithTx(List < String > sqlList)
 throws SQLException {
 // ..update logic
 }

 }

 *Then I implemented my customized sink function like this,*

 public class CustomizedSink extends RichSinkFunction < Record > {

 private static Logger LOG =
 LoggerFactory.getLogger("userFlinkLogger");
 private static final int batchInsertSize = 5000;
 private static final long flushInterval = 60 * 1000 L;
 private long lastFlushTime;
 private BatchCommit batchCommit;
 private ConcurrentLinkedQueue < Object > cacheQueue;
 private ExecutorService threadPool;

 @Override
 public void open(Configuration parameters) throws Exception {
 cacheQueue = new ConcurrentLinkedQueue < > ();
 threadPool = Executors.newFixedThreadPool(1);
 batchCommit = new BatchCommit();
 super.open(parameters);
 }

 @Override
 public void invoke(DriverLbs driverLbs) throws Exception {
 cacheQueue.add(driverLbs);
 if (cacheQueue.size() >= batchInsertSize ||
 System.currentTimeMillis() - lastFlushTime >=
 flushInterval) {
 lastFlushTime = System.currentTimeMillis();
 threadPool.execute(batchCommit);
 }
 }

 private class BatchCommit implements Runnable {
 @Override
 public void run() {
 try {
 int ct;
 synchronized(cacheQueue) {
 List < String > sqlList = Lists.newArrayList();
  

Re: standalone flink savepoint restoration

2019-10-17 Thread Congxian Qiu
Hi
Do you specify the operatorid for all the operators?[1][2], asking this
because from the exception you gave, if you only add new operators and all
the old operators have specified operatorid, seems there would not throw
such exception.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#assigning-operator-ids
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#should-i-assign-ids-to-all-operators-in-my-job
Best,
Congxian


Yun Tang  于2019年10月17日周四 下午12:31写道:

> Hi Matt
>
> Have you ever configured `high-availability.cluster-id` ? If not, Flink
> standalone job would first try to recover from high-availability checkpoint
> store named '/default'. If there existed a checkpoint, Flink would always
> restore from checkpoint disabling 'allowNonRestoredState'[1] (always
> passing 'false' in). Please consider to configure
> `high-availability.cluster-id` to different values to enable you could
> resume job with dropping some operators.
>
>
> [1]
> https://github.com/apache/flink/blob/7670e237d7d8d3727537c09b8695c860ea92d467/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java#L190
>
> Best
> Yun Tang
> --
> *From:* Matt Anger 
> *Sent:* Thursday, October 17, 2019 5:46
> *To:* user@flink.apache.org 
> *Subject:* standalone flink savepoint restoration
>
> Hello everyone,
> I am running a flink job in k8s as a standalone HA job. Now I updated my
> job w/ some additional sinks, which I guess have made the checkpoints
> incompatible with the newer version, meaning flink now crashes on bootup
> with the following:
>  Caused by: java.lang.IllegalStateException: There is no operator for the
> state c9b81dfc309f1368ac7efb5864e7b693
>
> So I rollback the deployment, log into the pod and create a savestate, and
> then modify my args to add
>
> --allowNonRestoredState
> and
> -s 
>
> but it doesn't look like the standalone cluster is respecting those
> arguments. I've tried searching around, but haven't found any solutions.
> The docker image I have is running the docker-entrypoint.sh and the full
> arg list is below as copy-pastad out of my k8s yaml file:
>
>  47 - job-cluster
>  48 - -Djobmanager.rpc.address=$(SERVICE_NAME)
>  49 - -Djobmanager.rpc.port=6123
>  50 - -Dresourcemanager.rpc.port=6123
>  51 - -Dparallelism.default=$(NUM_WORKERS)
>  52 - -Dblob.server.port=6124
>  53 - -Dqueryable-state.server.ports=6125
>  54 - -Ds3.access-key=$(AWS_ACCESS_KEY_ID)
>  55 - -Ds3.secret-key=$(AWS_SECRET_ACCESS_KEY)
>  56 - -Dhigh-availability=zookeeper
>  57 - -Dhigh-availability.jobmanager.port=50010
>  58 - -Dhigh-availability.storageDir=$(S3_HA)
>  59 - -Dhigh-availability.zookeeper.quorum=$(ZK_QUORUM)
>  60 - -Dstate.backend=filesystem
>  61 - -Dstate.checkpoints.dir=$(S3_CHECKPOINT)
>  62 - -Dstate.savepoints.dir=$(S3_SAVEPOINT)
>  63 - --allowNonRestoredState
>  64 - -s $(S3_SAVEPOINT)
>
> I originally didn't have the last 2 args, I added them based upon various
> emails I saw on this list and other google search results, to no avail.
>
> Thanks
> -Matt
>


ElasticSearch failing when parallelised

2019-10-17 Thread Nicholas Walton
HI,

I’m running ElasticSearch as a sink for a batch file processing a CSV file of 
6.2 million rows, with each row being 181 numeric values. It quite happily 
processes a small example of around 2,000 rows, running each column through a 
single parallel pipeline, keyed by column number.

However, once I scale up to the full data size, with parallelism set higher 
than one typically eight, after a while ElasticSearch fails as below

2019-10-17 09:36:09,550 ERROR 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase  - 
Failed Elasticsearch bulk request: request retries exceeded max retry timeout 
[3]
java.io.IOException: request retries exceeded max retry timeout [3]
at 
org.elasticsearch.client.RestClient$1.retryIfPossible(RestClient.java:411)
at org.elasticsearch.client.RestClient$1.failed(RestClient.java:398)
at org.apache.http.concurrent.BasicFuture.failed(BasicFuture.java:137)
at 
org.apache.http.impl.nio.client.AbstractClientExchangeHandler.failed(AbstractClientExchangeHandler.java:419)
at 
org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:375)
at 
org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92)
at 
org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39)
at 
org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175)
at 
org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:263)
at 
org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:492)
at 
org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:213)
at 
org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280)
at 
org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
at 
org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
at java.lang.Thread.run(Thread.java:745)
2019-10-17 09:36:09,576 INFO  org.example.Job$  
- Failed ElasticSearch document. Exception rethrown
2019-10-17 09:36:09,624 ERROR 
org.apache.flink.streaming.runtime.tasks.StreamTask   - Error during 
disposal of stream operator.
java.lang.RuntimeException: An error occurred in ElasticsearchSink.
at 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkErrorAndRethrow(ElasticsearchSinkBase.java:381)
at 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.close(ElasticsearchSinkBase.java:343)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:479)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:380)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: request retries exceeded max retry timeout 
[3]
at 
org.elasticsearch.client.RestClient$1.retryIfPossible(RestClient.java:411)
at org.elasticsearch.client.RestClient$1.failed(RestClient.java:398)
at org.apache.http.concurrent.BasicFuture.failed(BasicFuture.java:137)
at 
org.apache.http.impl.nio.client.AbstractClientExchangeHandler.failed(AbstractClientExchangeHandler.java:419)
at 
org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:375)
at 
org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92)
at 
org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39)
at 
org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175)
at 
org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:263)
at 
org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:492)
at 
org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:213)
at 
org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280)
at 
org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
at 
org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
... 1 more
2019-10-17 09:36:09,629 INFO  org.apache.flink.runtime.taskmanager.Task 
- Window(GlobalWindows(), CountTrigger, CountEvictor, 
ScalaProcessWindowFunctionWrapper) -> Map -> Map -> Map -> Sink