[jira] [Created] (FLINK-8541) Mesos RM should recover from failover timeout

2018-01-31 Thread Eron Wright (JIRA)
Eron Wright  created FLINK-8541:
---

 Summary: Mesos RM should recover from failover timeout
 Key: FLINK-8541
 URL: https://issues.apache.org/jira/browse/FLINK-8541
 Project: Flink
  Issue Type: Bug
  Components: Cluster Management, Mesos
Affects Versions: 1.3.0
Reporter: Eron Wright 
Assignee: Eron Wright 


When a framework disconnects unexpectedly from Mesos, the framework's Mesos 
tasks continue to run for a configurable period of time known as the failover 
timeout.   If the framework reconnects to Mesos after the timeout has expired, 
Mesos rejects the connection attempt.   It is expected that the framework 
discard the previous framework ID and then connect as a new framework.

When Flink is in this situation, the only recourse is to manually delete the ZK 
state where the framework ID kept.   Let's improve the logic of the Mesos RM to 
automate that.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8540) FileStateHandles must not attempt to delete their parent directory.

2018-01-31 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-8540:
---

 Summary: FileStateHandles must not attempt to delete their parent 
directory.
 Key: FLINK-8540
 URL: https://issues.apache.org/jira/browse/FLINK-8540
 Project: Flink
  Issue Type: Sub-task
  Components: State Backends, Checkpointing
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.5.0


Currently, every file disposal checks if the parent directory is now empty, and 
deletes it if that is the case. That is not only inefficient, but prohibitively 
expensive on some systems, like Amazon S3.

With the resolution of [FLINK-8539], this will no longer be necessary.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8539) Introduce "CompletedCheckpointStorageLocation" to explicitly handle disposal of checkpoint storage locations

2018-01-31 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-8539:
---

 Summary: Introduce "CompletedCheckpointStorageLocation" to 
explicitly handle disposal of checkpoint storage locations
 Key: FLINK-8539
 URL: https://issues.apache.org/jira/browse/FLINK-8539
 Project: Flink
  Issue Type: Sub-task
  Components: State Backends, Checkpointing
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.5.0


The storage location of completed checkpoints misses a proper representation. 
Because of that, there is no place that can handle the deletion of a checkpoint 
directory, or the dropping of a checkpoint specific table.

Current workaround for file systems is, for example, that every file disposal 
checks if the parent directory is now empty, and deletes it if that is the 
case. That is not only inefficient, but prohibitively expensive on some 
systems, like Amazon S3.

Properly representing the storage location for completed checkpoints allows us 
to add a disposal call for that location.

That {{CompletedCheckpointStorageLocation}} can also be used to capture 
"external pointers", metadata, and even allow us to use custom serialization 
and deserialization of the metadata in the future, making the abstraction more 
extensible by allowing users to introduce new types of state handles.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: Fwd: Re: Kinesis consumer shard skew - FLINK-8516

2018-01-31 Thread Tzu-Li (Gordon) Tai
Hi Thomas,

Thanks a lot for opening the PR.
I had a look at it and the comment you left, and left my thoughts there. In 
general, I think it’s heading towards the right direction.

Cheers,
Gordon

On 31 January 2018 at 4:03:36 PM, Thomas Weise (t...@apache.org) wrote:

Hi Gordon,

Can you have a quick look at the PR and the comment I added. That will help to 
polish it up and make it ready for review.

Thanks!
Thomas

--
sent from mobile
-- Forwarded message --
From: "Thomas Weise" 
Date: Jan 30, 2018 5:53 PM
Subject: Re: Kinesis consumer shard skew - FLINK-8516
To: "Tzu-Li (Gordon) Tai" 
Cc: 

I created a PR for further discussion:

https://github.com/apache/flink/pull/5393

There are a few TODOs where I think improvements can be made. Let me know if 
you agree with the overall direction.

Thanks,
Thomas


On Mon, Jan 29, 2018 at 3:01 PM, Thomas Weise  wrote:

What do you mean by checkpoint “assignments”? The assignment from 
shard-to-source is only fixed within a single execution of the job. We only 
checkpoint the progress of each shard in the state.
Given that we support plugging in custom shard assignment hashing, then the 
assignment could potentially change every time we restore.

If what you mean is actually retaining checkpointed shard state (i.e. the 
progress sequence number), then:
I don’t really see a reason why a user would want to ignore checkpointed shard 
sequence numbers, but it could really just be my lack of knowledge for possible 
real user scenarios.
Though, ignoring checkpointed shard sequence numbers on restore from a 
savepoint would immediately break exactly-once guarantees, so if we do have a 
case for that, we need to be very educative in its use and side effects.


At the moment only the shard offsets are saved, and not the subtask 
association. With "checkpointed assignments" I meant saving which shards belong 
to which subtask, but that may lead to problems when changing the consumer 
parallelism.

It seems that balanced distribution is hard to achieve without synchronization 
between subtasks. There is the possibility of subtasks intermittently 
retrieving different shard lists while resharding occurs. The assignment logic 
would either need to only consider the shardID and result in skewed 
distribution (current implementation) or there needs to be a barrier at which 
each subtask is guaranteed to see the same shard list, which would allow for 
round-robin distribution.

In order to rebase the mapping after resharding, we would probably need all 
subtasks to agree on the shard list and most recent offsets (distributed 
consensus) and apply changes at a checkpoint barrier? I really don't see how 
else we can end up with balanced shard distribution as generic solution.

Thanks,
Thomas




[jira] [Created] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-01-31 Thread Timo Walther (JIRA)
Timo Walther created FLINK-8538:
---

 Summary: Add a Kafka table source factory with JSON format support
 Key: FLINK-8538
 URL: https://issues.apache.org/jira/browse/FLINK-8538
 Project: Flink
  Issue Type: Sub-task
  Components: Table API  SQL
Reporter: Timo Walther


Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
be added. This issue includes improving the existing JSON descriptor with 
validation that can be used for other connectors as well. It is up for 
discussion if we want to split the KafkaJsonTableSource into connector and 
format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8537) Add a Kafka table source factory with Avro format support

2018-01-31 Thread Timo Walther (JIRA)
Timo Walther created FLINK-8537:
---

 Summary: Add a Kafka table source factory with Avro format support
 Key: FLINK-8537
 URL: https://issues.apache.org/jira/browse/FLINK-8537
 Project: Flink
  Issue Type: Sub-task
  Components: Table API  SQL
Reporter: Timo Walther


Similar to {{CSVTableSourceFactory}} a Kafka table source factory should be 
added. This issue includes creating a {{Avro}} descriptor with validation that 
can be used for other connectors as well. It is up for discussion if we want to 
split the KafkaAvroTableSource into connector and format such that we can reuse 
the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8536) Improve CSVTableSourceFactory to support all descriptor properties

2018-01-31 Thread Timo Walther (JIRA)
Timo Walther created FLINK-8536:
---

 Summary: Improve CSVTableSourceFactory to support all descriptor 
properties
 Key: FLINK-8536
 URL: https://issues.apache.org/jira/browse/FLINK-8536
 Project: Flink
  Issue Type: Sub-task
  Components: Table API  SQL
Reporter: Timo Walther
Assignee: Timo Walther


The {{CSVTableSourceFactory}} is the first built-in table source factory. 
However, the features are still limited and need to be improved. E.g. it is not 
possible to specify time attributes. Additionally the schema must always match 
the format which should not be necessary. {{Schema.from()}} must validate if 
the field exists in the format.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8535) Implement a basic set of table source factories

2018-01-31 Thread Timo Walther (JIRA)
Timo Walther created FLINK-8535:
---

 Summary: Implement a basic set of table source factories
 Key: FLINK-8535
 URL: https://issues.apache.org/jira/browse/FLINK-8535
 Project: Flink
  Issue Type: New Feature
  Components: Table API  SQL
Reporter: Timo Walther


Since FLINK-8240 has been merged, we can now add more table sources factories 
for different connectors and formats including descriptors for specifying them 
programmatically. This issue is an umbrella issues for tracking all efforts 
about table descriptors and table source factories.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] Releasing Flink 1.4.1

2018-01-31 Thread Tzu-Li (Gordon) Tai
Thanks a lot for the wrap-up Chesnay.

For open issues with a PR already:

[FLINK-8275] Flink YARN TMs using wrong local Kerberos keytab path 
[FLINK-8318] Shade all Elasticsearch connector dependencies 
I am currently reviewing these two.

[FLINK-8409] KafkaConsumerThread potential NPE 
[FLINK-8419] No metrics registered for dynamically discovered Kafka partitions 
These have been reviewed and is mergeable.

[FLINK-8421] HeapInternalTimerService does not reconfigure serializers on 
restore 

This is still waiting for a review, but I think Aljoscha will start reviewing 
it soon.


For unassigned PRs, I recommend that we start kicking off a 1.4.1 RC by the end 
of this week with what we have, and leave those for a fast 1.4.2 release 
afterwards.

The reason is we’ve already collected quite a few very critical fixes for 1.4.1 
that users have reported and are waiting on, so blocking the 1.4.1 release any 
longer on still un-assigned issues might not be a good idea.

I would also be happy to be the release manager for the 1.4.1 release.

Cheers,
Gordon

On 31 January 2018 at 11:55:06 AM, Chesnay Schepler (ches...@apache.org) wrote:

Here's a list of all issues raised in this thread and their respective  
state:  

Unassigned:  

[FLINK-7756] RocksDB state backend Checkpointing (Async and Incremental) is not 
working with CEP.  
[FLINK-7760] Restore failing from external checkpointing metadata.  
[FLINK-8042] retry individual failover-strategy for some time first before 
reverting to full job restart  

PR open:  

[FLINK-8275] Flink YARN TMs using wrong local Kerberos keytab path  
[FLINK-8318] Shade all Elasticsearch connector dependencies  
[FLINK-8409] KafkaConsumerThread potential NPE  
[FLINK-8419] No metrics registered for dynamically discovered Kafka partitions  
[FLINK-8421] HeapInternalTimerService does not reconfigure serializers on 
restore  

Resolved:  

[FLINK-7949] AsyncWaitOperator is not restarting when queue is full  
[FLINK-8226] Dangling reference generated after NFA clean up timed out 
SharedBufferEntry  
[FLINK-8295] Cassandra connector has incorrect Netty shading  
[FLINK-8325] Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1)  
[FLINK-8355] DataSet Should not union a NULL row for AGG without GROUP BY 
clause.  
[FLINK-8406] BucketingSink does not detect hadoop file systems  
[FLINK-8043] change fullRestarts (for fine grained recovery) from guage to 
counter  

On 15.01.2018 11:49, Tzu-Li (Gordon) Tai wrote:  
> Hi all,  
>  
> I would like to bring up the discussion of releasing Flink 1.4.1.  
> We’ve collected a few issues over the past few weeks, some of which is quite  
> critical.  
>  
> Here’s a list of issues, and their current status, that I’m aware of and IMO  
> should be included:  
> - [FLINK-8275] Flink YARN TMs using wrong local Kerberos keytab path  
> (pending PR)  
> - [FLINK-8421] HeapInternalTimerService does not reconfigure serializers on  
> restore (assigned, no PR)  
> - [FLINK-8318] Shade all Elasticsearch connector dependencies (pending PR)  
> - [FLINK-8419] No metrics registered for dynamically discovered Kafka  
> partitions (assigned, no PR)  
> - [FLINK-8409] KafkaConsumerThread potential NPE (assigned, no PR)  
> - [FLINK-8295] Cassandra connector has incorrect Netty shading (merged)  
>  
> Also FYI, here [1] is a list of tickets that were marked as a blocker for  
> 1.4.1 on JIRA.  
> [1]  
> https://issues.apache.org/jira/browse/FLINK-8263?filter=-1=project%20%3D%20FLINK%20AND%20priority%20%3D%20Blocker%20AND%20fixVersion%20%3D%201.4.1%20order%20by%20updated%20DESC
>   
>  
> If you think something is missing and should be added for 1.4.1, could you  
> please add them to the JIRA and let others know here?  
>  
> Cheers,  
> Gordon  
>  
>  
>  
> --  
> Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/  
>  



[jira] [Created] (FLINK-8534) if insert too much BucketEntry into one bucket in join of iteration. Will cause Caused by: java.io.FileNotFoundException release file error

2018-01-31 Thread zhu.qing (JIRA)
zhu.qing created FLINK-8534:
---

 Summary: if insert too much BucketEntry into one bucket in join of 
iteration. Will cause Caused by: java.io.FileNotFoundException release file 
error
 Key: FLINK-8534
 URL: https://issues.apache.org/jira/browse/FLINK-8534
 Project: Flink
  Issue Type: Bug
 Environment: windows ideal 8g ram 4core i5 cpu. Flink 1.4.0
Reporter: zhu.qing


When insert too much entry into bucket will cause 

spillPartition(). So 

this.buildSideChannel = ioAccess.createBlockChannelWriter(targetChannel, 
bufferReturnQueue); 

And in 

prepareNextPartition() of reopenablemutable hash table

furtherPartitioning = true; 

so in 

finalizeProbePhase()

freeMemory.add(this.probeSideBuffer.getCurrentSegment());

// delete the spill files
this.probeSideChannel.close();
System.out.println("HashPartition probeSideRecordCounter Delete");
this.buildSideChannel.deleteChannel();
this.probeSideChannel.deleteChannel();

after deleteChannel the next iteartion will fail.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[DISCUSS] Releasing Flink 1.5.0

2018-01-31 Thread Aljoscha Krettek
Hi Everyone,

When we decided to do the 1.4.0 release a while back we did that to get a 
stable release out before putting in a couple of new features. Back then, some 
of those new features (FLIP-6, network stack changes, local state recovery) 
were almost ready and we wanted to do a shortened 1.5.0 development cycle to 
allow for those features to become ready and then do the next release.

We are now approaching the approximate time where we wanted to do the Flink 
1.5.0 release so I would like to gauge where we are and gather opinions on how 
we should proceed now.

With this, I'd also like to propose myself as the release manager for 1.5.0 but 
I'm very happy to yield if someone else would be interested in doing that.

What do you think?

Best,
Aljoscha

[jira] [Created] (FLINK-8533) Support MasterTriggerRestoreHook state reinitialization

2018-01-31 Thread Eron Wright (JIRA)
Eron Wright  created FLINK-8533:
---

 Summary: Support MasterTriggerRestoreHook state reinitialization
 Key: FLINK-8533
 URL: https://issues.apache.org/jira/browse/FLINK-8533
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Affects Versions: 1.3.0
Reporter: Eron Wright 
Assignee: Eron Wright 


{{MasterTriggerRestoreHook}} enables coordination with an external system for 
taking or restoring checkpoints. When execution is restarted from a checkpoint, 
{{restoreCheckpoint}} is called to restore or reinitialize the external system 
state. There's an edge case where the external state is not adequately 
reinitialized, that is when execution fails _before the first checkpoint_. In 
that case, the hook is not invoked and has no opportunity to restore the 
external state to initial conditions.

The impact is a loss of exactly-once semantics in this case. For example, in 
the Pravega source function, the reader group state (e.g. stream position data) 
is stored externally. In the normal restore case, the reader group state is 
forcibly rewound to the checkpointed position. In the edge case where no 
checkpoint has yet been successful, the reader group state is not rewound and 
consequently some amount of stream data is not reprocessed.

A possible fix would be to introduce an {{initializeState}} method on the hook 
interface. Similar to {{CheckpointedFunction::initializeState}}, this method 
would be invoked unconditionally upon hook initialization. The Pravega hook 
would, for example, initialize or forcibly reinitialize the reader group state. 
   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)