[FLINK - 9503] Starter task

2018-06-01 Thread Deepak Sharma
Hi devs,

I've been using Flink for about a year now. I was wondering if I could
start contributing with this sub-task:

https://issues.apache.org/jira/browse/FLINK-9503

related to this Task:

https://issues.apache.org/jira/browse/FLINK-2032

Can someone give me permissions in jira to assign the sub-task to myself?

Thanks,
Deepak


[jira] [Created] (FLINK-9503) Migrate integration tests for iterative aggregators

2018-06-01 Thread Deepak Sharma (JIRA)
Deepak Sharma created FLINK-9503:


 Summary: Migrate integration tests for iterative aggregators
 Key: FLINK-9503
 URL: https://issues.apache.org/jira/browse/FLINK-9503
 Project: Flink
  Issue Type: Sub-task
  Components: Tests
Reporter: Deepak Sharma


Migrate integration tests in org.apache.flink.test.iterative.aggregators to use 
collect() instead of temp files. Related to parent jira.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9502) Use generic parameter search for user-define functions when argument contains Object type

2018-06-01 Thread Rong Rong (JIRA)
Rong Rong created FLINK-9502:


 Summary: Use generic parameter search for user-define functions 
when argument contains Object type
 Key: FLINK-9502
 URL: https://issues.apache.org/jira/browse/FLINK-9502
 Project: Flink
  Issue Type: Sub-task
  Components: Table API  SQL
Reporter: Rong Rong
Assignee: Rong Rong






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9501) Allow Object.class type in user-define functions as parameter types but not result types

2018-06-01 Thread Rong Rong (JIRA)
Rong Rong created FLINK-9501:


 Summary: Allow Object.class type in user-define functions as 
parameter types but not result types
 Key: FLINK-9501
 URL: https://issues.apache.org/jira/browse/FLINK-9501
 Project: Flink
  Issue Type: Sub-task
  Components: Table API  SQL
Reporter: Rong Rong
Assignee: Rong Rong






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9500) FileUploadHandler does not handle EmptyLastHttpContent

2018-06-01 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-9500:
---

 Summary: FileUploadHandler does not handle EmptyLastHttpContent
 Key: FLINK-9500
 URL: https://issues.apache.org/jira/browse/FLINK-9500
 Project: Flink
  Issue Type: Bug
  Components: Client, REST
Affects Versions: 1.5.0, 1.6.0
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.6.0, 1.5.1


The FileUploadHandler does not properly handle {{EmptyLastHttpContent}} 
messages. When passing these to a decoder an exception is thrown.

We should either catch the exception or just don't pass such a message in the 
first place.

{code}
018-06-01 16:05:46,514 WARN 
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - Unhandled 
exception 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostRequestDecoder$EndOfDataDecoderException
 at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostMultipartRequestDecoder.hasNext(HttpPostMultipartRequestDecoder.java:366)
 at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostRequestDecoder.hasNext(HttpPostRequestDecoder.java:241)
 at 
org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:92)
 at 
org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:51)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(A
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9499) Allow REST API for running a job to provide job configuration as body of POST request

2018-06-01 Thread Esteban Serrano (JIRA)
Esteban Serrano created FLINK-9499:
--

 Summary: Allow REST API for running a job to provide job 
configuration as body of POST request
 Key: FLINK-9499
 URL: https://issues.apache.org/jira/browse/FLINK-9499
 Project: Flink
  Issue Type: Improvement
  Components: REST
Affects Versions: 1.3.2
Reporter: Esteban Serrano


Based on 
[this|https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/rest_api.html#submitting-programs]
 documentation, the REST API provides a way to submit a request for running a 
Flink job. The POST request must include the job configuration information as 
query parameters using the documented parameter names ("program-args", 
"entry-class", "parallelism", etc.) 

Depending on the job parameters, the full URL for the POST request can reach a 
size that is over the maximum size (currently at 4096 bytes) of what is allowed 
by the configuration of Netty. To overcome this, it would be useful to allow 
users to provide the job configuration not only as query parameters but also as 
POST parameters. 

For the most part, it is the "program-args" parameter that can make the URL 
grow in size based on the needs of the developer and the job. All other 
attributes should be pretty constant.  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9498) Disable dependency convergence for "flink-end-to-end-tests"

2018-06-01 Thread Hai Zhou (JIRA)
Hai Zhou created FLINK-9498:
---

 Summary: Disable dependency convergence for 
"flink-end-to-end-tests"
 Key: FLINK-9498
 URL: https://issues.apache.org/jira/browse/FLINK-9498
 Project: Flink
  Issue Type: Sub-task
  Components: Build System
Reporter: Hai Zhou
Assignee: Hai Zhou






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9497) Job Configuration showing the password in plain text

2018-06-01 Thread Vinay (JIRA)
Vinay created FLINK-9497:


 Summary: Job Configuration showing the password in plain text
 Key: FLINK-9497
 URL: https://issues.apache.org/jira/browse/FLINK-9497
 Project: Flink
  Issue Type: Bug
  Components: Security
Affects Versions: 1.3.2
Reporter: Vinay


I am submitting the job using Remote Execution environment, all the 
configurations are shown in plain text when clicked on a particular job , some 
of these configurations contain passwords (trustore,keystore, db etc password) 
which should be masked as shown on the Job Manager Configuration Screen



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9495) Implement ResourceManager for Kubernetes

2018-06-01 Thread Elias Levy (JIRA)
Elias Levy created FLINK-9495:
-

 Summary: Implement ResourceManager for Kubernetes
 Key: FLINK-9495
 URL: https://issues.apache.org/jira/browse/FLINK-9495
 Project: Flink
  Issue Type: Improvement
  Components: ResourceManager
Affects Versions: 1.5.0
Reporter: Elias Levy


I noticed there is no issue for developing a Kubernetes specific 
ResourceManager under FLIP-6, so I am creating this issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[ANNOUNCE] Flink Forward Berlin 2018 - Call for Presentations extended until June 11

2018-06-01 Thread Fabian Hueske
Hi everybody,

Due to popular demand, we've extended the Call for Presentations for Flink
Forward Berlin 2018 by one week.
The call will close on *Monday, June 11* (11:59pm CEST).

Please submit a proposal to present your Flink and Stream Processing use
case, experiences, and best practices in Berlin.
For the first time, Flink Forward will host a dedicated Research track to
share and discuss novel ideas and approaches.

You can submit your talk proposal at
https://flink-forward.org/call-for-presentations-submit-talk/

Best regards,
Fabian

(PC Chair for Flink Forward Berlin 2018)


[jira] [Created] (FLINK-9494) Race condition in Dispatcher with concurrent granting and revoking of leaderhship

2018-06-01 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-9494:


 Summary: Race condition in Dispatcher with concurrent granting and 
revoking of leaderhship
 Key: FLINK-9494
 URL: https://issues.apache.org/jira/browse/FLINK-9494
 Project: Flink
  Issue Type: Bug
  Components: Distributed Coordination
Affects Versions: 1.5.0
Reporter: Till Rohrmann
 Fix For: 1.6.0, 1.5.1


The {{Dispatcher}} contains a race condition when an instance is granted 
leadership and then quickly afterwards gets the leadership revoked. The problem 
is that we don't check in the recovered jobs future callback that we still have 
the leadership. This can lead to a corrupted state of the {{Dispatcher}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9493) Forward exception when releasing a TaskManager at the SlotPool

2018-06-01 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-9493:


 Summary: Forward exception when releasing a TaskManager at the 
SlotPool
 Key: FLINK-9493
 URL: https://issues.apache.org/jira/browse/FLINK-9493
 Project: Flink
  Issue Type: Bug
  Components: JobManager
Affects Versions: 1.5.0
Reporter: Till Rohrmann
 Fix For: 1.6.0, 1.5.1


We should add a cause when calling {{SlotPool#releaseTaskManager}} because the 
release operation can be the final root cause of the job failure. Currently, we 
create a generic exception which hides the true reason for the release 
operation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9492) Failed to build RexCall with SqlDatetimeSubtractionOperator

2018-06-01 Thread yuqi (JIRA)
yuqi created FLINK-9492:
---

 Summary: Failed to build RexCall with 
SqlDatetimeSubtractionOperator
 Key: FLINK-9492
 URL: https://issues.apache.org/jira/browse/FLINK-9492
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.5.0
 Environment: 
{code:java}
public static void main(String[] args) {
try {
SchemaPlus rootSchema = 
Frameworks.createRootSchema(true);
rootSchema.add("USERS", new AbstractTable() {
public RelDataType getRowType(final 
RelDataTypeFactory typeFactory) {
RelDataTypeFactory.FieldInfoBuilder 
builder = typeFactory.builder();
builder.add("ID", new BasicSqlType(new 
RelDataTypeSystemImpl() {
}, SqlTypeName.INTEGER));
builder.add("NAME", new 
BasicSqlType(new RelDataTypeSystemImpl() {
}, SqlTypeName.CHAR));

builder.add("TIME_D", new 
BasicSqlType(new RelDataTypeSystemImpl() {
}, SqlTypeName.TIMESTAMP));
return builder.build();
}
});

rootSchema.add("TABLE_RESULT", new AbstractTable() {
public RelDataType getRowType(final 
RelDataTypeFactory typeFactory) {
RelDataTypeFactory.FieldInfoBuilder 
builder = typeFactory.builder();
builder.add("ID", new BasicSqlType(new 
RelDataTypeSystemImpl() {
}, SqlTypeName.INTEGER));
builder.add("NAME", new 
BasicSqlType(new RelDataTypeSystemImpl() {
}, SqlTypeName.CHAR));
builder.add("SCORE", new 
BasicSqlType(new RelDataTypeSystemImpl() {
}, SqlTypeName.INTEGER));
return builder.build();
}
});
final FrameworkConfig config = 
Frameworks.newConfigBuilder()
.parserConfig(SqlParser.Config.DEFAULT)
.defaultSchema(rootSchema)
.build();
Planner planner = Frameworks.getPlanner(config);

SqlNode parse1 = planner.parse("insert into 
table_result(id, name, score) select a.id as id, a.name as name, 1 from users a 
where month(a.time_d - interval '30' day) >= 2");
SqlNode validate = planner.validate(parse1);

RelRoot root = planner.rel(validate);

RexBuilder builder1 = 
root.rel.getCluster().getRexBuilder();
LogicalFilter filter = (LogicalFilter) 
root.rel.getInput(0).getInput(0);

//get RexCall of SqlDatetimeSubtractionOperator
RexCall call = (RexCall) ((RexCall) ((RexCall) 
filter.getCondition()).operands.get(0)).getOperands().get(1);

builder1.makeCall(call.getOperator(), 
call.getOperands());

HepProgramBuilder builder = new HepProgramBuilder();

//builder.addRuleInstance(FilterJoinRule.FilterIntoJoinRule.FILTER_ON_JOIN);
//builder.addRuleInstance(FilterJoinRule.JOIN);
builder.addRuleCollection(Programs.RULE_SET);
HepPlanner hepPlanner = new HepPlanner(builder.build());
hepPlanner.setRoot(root.rel);
RelNode node = hepPlanner.findBestExp();

System.out.println("After>");
System.out.print(RelOptUtil.toString(node));

} catch (Exception e) {
e.printStackTrace();
}
}
{code}

Reporter: yuqi
Assignee: yuqi


When use RexBuilder to build RexCall with SqlDatetimeSubtractionOperator, it 
will throw exception.


{code:java}
java.lang.IndexOutOfBoundsException: index (2) must be less than size (2)
at 
com.google.common.base.Preconditions.checkElementIndex(Preconditions.java:310)
at 
com.google.common.base.Preconditions.checkElementIndex(Preconditions.java:293)
at 
com.google.common.collect.RegularImmutableList.get(RegularImmutableList.java:67)
at 
org.apache.calcite.rex.RexCallBinding.getOperandType(RexCallBinding.java:132)
at 

Re: Failure restarting Flink 1.5.0 job from checkpoint

2018-06-01 Thread Till Rohrmann
Thanks for reporting the issue Ken. This looks indeed very strange and we
need to investigate how this can happen.

Cheers,
Till

On Thu, May 31, 2018 at 8:07 PM, Ken Krugler 
wrote:

> Hi Aljoscha,
>
> Yes, looks that way, thanks the issue reference - I’d checked Jira few
> days ago, looks like FLINK-9458 was added very recently :)
>
> I’ll follow up in Jira to see if a small code snippet would be useful.
>
> — Ken
>
> > On May 31, 2018, at 1:17 AM, Aljoscha Krettek 
> wrote:
> >
> > Hi Ken,
> >
> > I think you might have independently discovered this issue:
> https://issues.apache.org/jira/browse/FLINK-9458 <
> https://issues.apache.org/jira/browse/FLINK-9458>
> >
> > Best,
> > Aljoscha
> >
> >> On 31. May 2018, at 01:46, Ken Krugler 
> wrote:
> >>
> >> Hi devs,
> >>
> >> I coded up a simple iteration that uses a KeyedProcessFunction, as a
> way of showing how to use timers to do state iteration.
> >>
> >> This worked fine, but then I wanted to try out checkpoints. I modified
> the KeyedProcessFunction to throw an exception after a fixed number of
> calls.
> >>
> >> When this happens, it puts my job into a loop, where restarting the job
> fails with a NullPointerException:
> >>
> >> 18/05/30 16:38:40 DEBUG executiongraph.ExecutionGraph:1496 - Try to
> restart or fail the job Flink Streaming Job (
> f144fd0fb301db0ae14c7b991a25b353) if no longer possible.
> >> java.lang.RuntimeException: Example of a failure triggering a job
> restart
> >>  at com.scaleunlimited.flinksnippets.examples.
> IterationWithProcessFunctionTimers$MyKeyedProcessFunction.processElement(
> IterationWithProcessFunctionTimers.java:74)
> >>  at com.scaleunlimited.flinksnippets.examples.
> IterationWithProcessFunctionTimers$MyKeyedProcessFunction.processElement(
> IterationWithProcessFunctionTimers.java:1)
> >>  at org.apache.flink.streaming.api.operators.KeyedProcessOperator.
> processElement(KeyedProcessOperator.java:85)
> >>  at org.apache.flink.streaming.runtime.io.StreamInputProcessor.
> processInput(StreamInputProcessor.java:202)
> >>  at org.apache.flink.streaming.runtime.tasks.
> OneInputStreamTask.run(OneInputStreamTask.java:103)
> >>  at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:306)
> >>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> >>  at java.lang.Thread.run(Thread.java:748)
> >> 18/05/30 16:38:40 INFO executiongraph.ExecutionGraph:1375 - Job Flink
> Streaming Job (f144fd0fb301db0ae14c7b991a25b353) switched from state
> FAILING to RESTARTING.
> >> 18/05/30 16:38:40 INFO executiongraph.ExecutionGraph:1506 - Restarting
> the job Flink Streaming Job (f144fd0fb301db0ae14c7b991a25b353).
> >> 18/05/30 16:38:40 WARN executiongraph.ExecutionGraph:1273 - Failed to
> restart the job.
> >> java.lang.NullPointerException
> >>  at org.apache.flink.runtime.jobmanager.scheduler.
> CoLocationConstraint.isAssignedAndAlive(CoLocationConstraint.java:104)
> >>  at org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup.
> resetConstraints(CoLocationGroup.java:119)
> >>  at org.apache.flink.runtime.executiongraph.ExecutionGraph.
> restart(ExecutionGraph.java:1247)
> >>  at org.apache.flink.runtime.executiongraph.restart.
> ExecutionGraphRestartCallback.triggerFullRecovery(
> ExecutionGraphRestartCallback.java:59)
> >>  at org.apache.flink.runtime.executiongraph.restart.
> FixedDelayRestartStrategy$1.run(FixedDelayRestartStrategy.java:68)
> >>  at java.util.concurrent.Executors$RunnableAdapter.
> call(Executors.java:511)
> >>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >>  at java.util.concurrent.ScheduledThreadPoolExecutor$
> ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> >>  at java.util.concurrent.ScheduledThreadPoolExecutor$
> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> >>  at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> >>  at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> >>  at java.lang.Thread.run(Thread.java:748)
> >>
> >> CoLocationContraint.java:104 is this one line function:
> >>
> >>  public boolean isAssignedAndAlive() {
> >>  return lockedLocation != null && sharedSlot.isAlive();
> >>  }
> >>
> >> So I have to assume sharedSlot is null - I don’t know if that’s valid,
> or if this means that the constraint is being used before setSharedSlot()
> is called.
> >>
> >> In any case, this same chunk of logging output repeats immediately, ad
> infinitum.
> >>
> >> Is there something else I should try to track down what’s going on?
> >>
> >> Thanks,
> >>
> >> — Ken
> >>
> >> PS - checkpointing is set up via:
> >>
> >>   final StreamExecutionEnvironment env = StreamExecutionEnvironment.
> createLocalEnvironment(2);
> >>   env.setParallelism(2);
> >>   env.enableCheckpointing(100L, CheckpointingMode.AT_LEAST_ONCE,
> true);
> >>

[jira] [Created] (FLINK-9490) Provide backwards compatibility for timer state of Flink 1.5

2018-06-01 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-9490:
-

 Summary: Provide backwards compatibility for timer state of Flink 
1.5 
 Key: FLINK-9490
 URL: https://issues.apache.org/jira/browse/FLINK-9490
 Project: Flink
  Issue Type: Sub-task
Reporter: Stefan Richter
Assignee: Stefan Richter


As we changed how timers are written to the snapshot, we also need to implement 
a backwards compatibility path that reads timers from Flink 1.5 and inserts 
them into our new timer state.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9489) Checkpoint timers as part of managed keyed state instead of raw keyed state

2018-06-01 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-9489:
-

 Summary: Checkpoint timers as part of managed keyed state instead 
of raw keyed state
 Key: FLINK-9489
 URL: https://issues.apache.org/jira/browse/FLINK-9489
 Project: Flink
  Issue Type: Sub-task
  Components: State Backends, Checkpointing
Reporter: Stefan Richter
Assignee: Stefan Richter
 Fix For: 1.6.0


Timer state should now become part of the keyed state backend snapshot, i.e., 
stored inside the managed keyed state. This means that we have to connect our 
preparation for asynchronous checkpoints with the backend, so that the timers 
are written as part of the state for each key-group. This means that we will 
also free up the raw keyed state an might expose it to user functions in the 
future.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9488) Create common entry point for master and workers

2018-06-01 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-9488:


 Summary: Create common entry point for master and workers
 Key: FLINK-9488
 URL: https://issues.apache.org/jira/browse/FLINK-9488
 Project: Flink
  Issue Type: Sub-task
  Components: Distributed Coordination
Affects Versions: 1.5.0
Reporter: Till Rohrmann
 Fix For: 1.6.0


To make the container setup easier, we should provide a single cluster entry 
point which uses leader election to become either the master or a worker which 
runs the {{TaskManager}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9487) Prepare InternalTimerHeap for asynchronous snapshots

2018-06-01 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-9487:
-

 Summary: Prepare InternalTimerHeap for asynchronous snapshots
 Key: FLINK-9487
 URL: https://issues.apache.org/jira/browse/FLINK-9487
 Project: Flink
  Issue Type: Sub-task
  Components: State Backends, Checkpointing, Streaming
Reporter: Stefan Richter
Assignee: Stefan Richter
 Fix For: 1.6.0


When we want to snapshot timers with the keyed backend state, this must happen 
as part of an asynchronous snapshot.

The data structure {{InternalTimerHeap}} needs to offer support for this 
through a lightweight copy mechanism (e.g. arraycopy of the timer queue, 
because timers are immutable w.r.t. serialization).

We can also stop keeping the dedup maps in {{InternalTimerHeap}} separated by 
key-group, all timers can go into one map.

Instead, we can implement online-partitioning as part of the asynchronous 
operation, similar to what we do in {{CopyOnWriteStateTable}} snapshots. Notice 
that in this intermediate state, the code will still run in the synchronous 
part until we are integrated with the backends for async snapshotting (next 
subtask of this jira).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9486) Introduce TimerState in keyed state backend

2018-06-01 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-9486:
-

 Summary: Introduce TimerState in keyed state backend
 Key: FLINK-9486
 URL: https://issues.apache.org/jira/browse/FLINK-9486
 Project: Flink
  Issue Type: Sub-task
  Components: State Backends, Checkpointing
Reporter: Stefan Richter
 Fix For: 1.6.0


This is the first implementation subtask.

Goal of this PR is to introduce a timer state that is registered with the keyed 
state backend, similar to other forms of keyed state.

For the {{HeapKeyedStateBackend}}, this state lives on the same level as the 
{{StateTable}} that hold other forms of keyed state, and the implementation is 
basically backed by {{InternalTimerHeap}}.

For {{RocksDBKeyedStateBackend}}, in this first step, we also introduce this 
state, outside of RocksDB and based upon {{InternalTimerHeap}}. This is an 
intermediate state, and we will later also implement the alternative to store 
the timers inside a column families in RocksDB. However, by taking this step, 
we could also still offer the option to have RocksDB state with heap-based 
timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9485) Improving Flink’s timer management for large state

2018-06-01 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-9485:
-

 Summary: Improving Flink’s timer management for large state
 Key: FLINK-9485
 URL: https://issues.apache.org/jira/browse/FLINK-9485
 Project: Flink
  Issue Type: Improvement
  Components: State Backends, Checkpointing, Streaming
Reporter: Stefan Richter
Assignee: Stefan Richter
 Fix For: 1.6.0


See 
https://docs.google.com/document/d/1XbhJRbig5c5Ftd77d0mKND1bePyTC26Pz04EvxdA7Jc/edit#heading=h.17v0k3363r6q



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[TABLE][SQL] Unify UniqueKeyExtractor and DataStreamRetractionRules

2018-06-01 Thread Piotr Nowojski
Hi,

Recently I was looking into upserts and upserts sources in Flink and while 
doing so, I noticed some potential room for improvement/simplification.

Currently there are 3 optimiser rules in DataStreamRetractionRules that work in 
three stages followed by UniqueKeyExtractor plan node visitor to set preferred 
updates mode, with validation for correct keys for upserts. First 
DataStreamRetractionRules setups UpdateAsRetractionTrait, next in another rule 
we use it setup AccModeTrait. AccModeTrait has only two values Acc (upserts) or 
AccRetract (retractions). This has some severe limitations and requires 
additional stage of UniqueKeyExtractor (implemented as a visitor) to actually 
verify that keys are set correctly.

I would propose to unify those into one visitor (probably RelShuttle 
implementation), that would traverse the plan from root -> leafs. On a way down 
it would collect preferences of the nodes regarding updates mode (including 
keys for upserts). On a way up, it would pick upsert(keys)/retraction/append 
only modes or fail if that was impossible [1].

I think that would simplify the code by noticeable margin. Instead of having 
this logic distributed among 4 classes in two files/independent steps, it would 
be in one simple class. 

It would open us a possibility for further improvements. For operators that 
could process both upserts or retractions (with before mentioned solution that 
decides upsert vs retract in the same step as validating keys) we could choose 
upserts if the keys are matching and fallback to retractions only if they 
don't. Now it wouldn’t be possible (example [2a], [2b]).

Thanks Piotrek

[1] Example impossible case:

DataStream> ds1 = 
JavaStreamTestData.getSmall3TupleDataSet(env);
Table t1 = tableEnv.fromDataStream(ds1, "a,b,c").select("a.cast(LONG) as 
a,b,c");

DataStream> ds2 = 
JavaStreamTestData.getSmall3TupleDataSet(env);
Table t2 = tableEnv.fromDataStream(ds2, "a,b,c");

Table g1 = t1.groupBy("a").select("a, b.count");
Table g2 = t2.groupBy("b").select("a.count as a, b");

g1.unionAll(g2).writeToSink(new TestUpsertSink(new String[]{("a")}, false));

[2a] 

val t1 = util.addTable[(Long, Long)]('a, 'b)
val t2 = util.addTable[(Long, Long)](‘x, ‘y)

val g1 = t1.groupBy("a").select("a, b.count")
val g2 = t2.groupBy(“y").select(“x.count, y")

val resultTable = g1.join(g2, “a=y”)

`g1.join(g2, “a=y”)` could accept upserts from both sides. Now both are 
retractions.

[2a] 

val t1 = util.addTable[(Long, Long)]('a, 'b)
val t2 = util.addTable[(Long, Long)](‘x, ‘y)

val g1 = t1.groupBy("a").select("a, b.count")
val g2 = t2.groupBy(“x").select(“x, y.count as y")

val resultTable = g1.join(g2, “a=y”)

`g1.join(g2, “a=y”)` could accept upserts from g1 (same key column) but only 
retractions from g2 (different key columns). Now both are retractions.




Re: [SQL] [CALCITE] not applicable function for TIME

2018-06-01 Thread Viktor Vlasov
Thank you:)

2018-06-01 13:18 GMT+03:00 Fabian Hueske :

> Done :-)
>
> 2018-06-01 12:12 GMT+02:00 Viktor Vlasov :
>
> > Could someone assign me to the created
> > https://issues.apache.org/jira/browse/FLINK-9482?
> >
> > 2018-05-31 17:48 GMT+03:00 Viktor Vlasov  >:
> >
> > > Thank you Shuyi, I will investigate these issues.
> > >
> > > 2018-05-31 17:42 GMT+03:00 Shuyi Chen :
> > >
> > >> I think you might find some context in these 2 PRs in Flink & Calcite
> > >> respectively:
> > >>
> > >> https://issues.apache.org/jira/browse/CALCITE-1987
> > >> https://issues.apache.org/jira/browse/FLINK-7934
> > >>
> > >> We have different EXTRACT implementation paths in Calcite and Flink.
> > Hope
> > >> it helps.
> > >>
> > >> On Thu, May 31, 2018 at 7:13 AM, Viktor Vlasov <
> > >> viktorvlasovsiber...@gmail.com> wrote:
> > >>
> > >> > Thank you for quick response, ok, I'll do it
> > >> >
> > >> >
> > >> > 2018-05-31 17:01 GMT+03:00 Fabian Hueske :
> > >> >
> > >> > > Hi Viktor,
> > >> > >
> > >> > > Welcome to the Flink dev mailing list!
> > >> > > You are certainly right, this is an unexpected behavior and IMO we
> > >> should
> > >> > > fix this.
> > >> > >
> > >> > > It would be great if you could open a JIRA issue for that and
> maybe
> > >> also
> > >> > > dig a bit into the issue to figure out why this happens.
> > >> > >
> > >> > > Thank you,
> > >> > > Fabian
> > >> > >
> > >> > > 2018-05-31 15:53 GMT+02:00 Viktor Vlasov <
> > >> viktorvlasovsiber...@gmail.com
> > >> > >:
> > >> > >
> > >> > > > Hi there!​
> > >> > > >
> > >> > > > First of all I want to thank you for your time and efforts about
> > >> this
> > >> > > > project.
> > >> > > >
> > >> > > > I am Software Engineer with almost 3 years experience, most of
> the
> > >> > time I
> > >> > > > work with Java related technologies.
> > >> > > >
> > >> > > > Recently I have started to consider possibility to contribute to
> > >> Flink.
> > >> > > > For begin I chose this issue: https://issues.apache.org/
> > >> > > > jira/browse/FLINK-9432.
> > >> > > >
> > >> > > > After implementation I have faced with an interesting question.
> > >> When I
> > >> > > was
> > >> > > > trying to decide what tests to create for the function DECADE in
> > >> class
> > >> > > > org/apache/flink/table/expressions/validation/
> > >> > > > ScalarFunctionsValidationTest.scala
> > >> > > > I've figured out that such functions as CENTURY and MILLENNIUM
> > work
> > >> > with
> > >> > > > TIME type without problems.  Here an examples:
> > >> > > > EXTRACT(CENTURY FROM TIME '00:00:00') - returns 0
> > >> > > > EXTRACT(MILLENNIUM FROM TIME '00:00:00') - returns 0
> > >> > > >
> > >> > > > It's strange by my opinion, time is not date and how we can
> > extract
> > >> > such
> > >> > > > things from that.
> > >> > > >
> > >> > > > Meanwhile when I try to use similar logic in calcite, error is
> > >> occured.
> > >> > > > Here an example:
> > >> > > > SELECT EXTRACT(CENTURY FROM TIME '00:00:00');
> > >> > > > throws `java.lang.AssertionError: unexpected TIME`
> > >> > > >
> > >> > > > Is it necessary to create separate issue for that?
> > >> > > >
> > >> > >
> > >> >
> > >>
> > >>
> > >>
> > >> --
> > >> "So you have to trust that the dots will somehow connect in your
> > future."
> > >>
> > >
> > >
> >
>


Re: [SQL] [CALCITE] not applicable function for TIME

2018-06-01 Thread Fabian Hueske
Done :-)

2018-06-01 12:12 GMT+02:00 Viktor Vlasov :

> Could someone assign me to the created
> https://issues.apache.org/jira/browse/FLINK-9482?
>
> 2018-05-31 17:48 GMT+03:00 Viktor Vlasov :
>
> > Thank you Shuyi, I will investigate these issues.
> >
> > 2018-05-31 17:42 GMT+03:00 Shuyi Chen :
> >
> >> I think you might find some context in these 2 PRs in Flink & Calcite
> >> respectively:
> >>
> >> https://issues.apache.org/jira/browse/CALCITE-1987
> >> https://issues.apache.org/jira/browse/FLINK-7934
> >>
> >> We have different EXTRACT implementation paths in Calcite and Flink.
> Hope
> >> it helps.
> >>
> >> On Thu, May 31, 2018 at 7:13 AM, Viktor Vlasov <
> >> viktorvlasovsiber...@gmail.com> wrote:
> >>
> >> > Thank you for quick response, ok, I'll do it
> >> >
> >> >
> >> > 2018-05-31 17:01 GMT+03:00 Fabian Hueske :
> >> >
> >> > > Hi Viktor,
> >> > >
> >> > > Welcome to the Flink dev mailing list!
> >> > > You are certainly right, this is an unexpected behavior and IMO we
> >> should
> >> > > fix this.
> >> > >
> >> > > It would be great if you could open a JIRA issue for that and maybe
> >> also
> >> > > dig a bit into the issue to figure out why this happens.
> >> > >
> >> > > Thank you,
> >> > > Fabian
> >> > >
> >> > > 2018-05-31 15:53 GMT+02:00 Viktor Vlasov <
> >> viktorvlasovsiber...@gmail.com
> >> > >:
> >> > >
> >> > > > Hi there!​
> >> > > >
> >> > > > First of all I want to thank you for your time and efforts about
> >> this
> >> > > > project.
> >> > > >
> >> > > > I am Software Engineer with almost 3 years experience, most of the
> >> > time I
> >> > > > work with Java related technologies.
> >> > > >
> >> > > > Recently I have started to consider possibility to contribute to
> >> Flink.
> >> > > > For begin I chose this issue: https://issues.apache.org/
> >> > > > jira/browse/FLINK-9432.
> >> > > >
> >> > > > After implementation I have faced with an interesting question.
> >> When I
> >> > > was
> >> > > > trying to decide what tests to create for the function DECADE in
> >> class
> >> > > > org/apache/flink/table/expressions/validation/
> >> > > > ScalarFunctionsValidationTest.scala
> >> > > > I've figured out that such functions as CENTURY and MILLENNIUM
> work
> >> > with
> >> > > > TIME type without problems.  Here an examples:
> >> > > > EXTRACT(CENTURY FROM TIME '00:00:00') - returns 0
> >> > > > EXTRACT(MILLENNIUM FROM TIME '00:00:00') - returns 0
> >> > > >
> >> > > > It's strange by my opinion, time is not date and how we can
> extract
> >> > such
> >> > > > things from that.
> >> > > >
> >> > > > Meanwhile when I try to use similar logic in calcite, error is
> >> occured.
> >> > > > Here an example:
> >> > > > SELECT EXTRACT(CENTURY FROM TIME '00:00:00');
> >> > > > throws `java.lang.AssertionError: unexpected TIME`
> >> > > >
> >> > > > Is it necessary to create separate issue for that?
> >> > > >
> >> > >
> >> >
> >>
> >>
> >>
> >> --
> >> "So you have to trust that the dots will somehow connect in your
> future."
> >>
> >
> >
>


Re: [SQL] [CALCITE] not applicable function for TIME

2018-06-01 Thread Viktor Vlasov
Could someone assign me to the created
https://issues.apache.org/jira/browse/FLINK-9482?

2018-05-31 17:48 GMT+03:00 Viktor Vlasov :

> Thank you Shuyi, I will investigate these issues.
>
> 2018-05-31 17:42 GMT+03:00 Shuyi Chen :
>
>> I think you might find some context in these 2 PRs in Flink & Calcite
>> respectively:
>>
>> https://issues.apache.org/jira/browse/CALCITE-1987
>> https://issues.apache.org/jira/browse/FLINK-7934
>>
>> We have different EXTRACT implementation paths in Calcite and Flink. Hope
>> it helps.
>>
>> On Thu, May 31, 2018 at 7:13 AM, Viktor Vlasov <
>> viktorvlasovsiber...@gmail.com> wrote:
>>
>> > Thank you for quick response, ok, I'll do it
>> >
>> >
>> > 2018-05-31 17:01 GMT+03:00 Fabian Hueske :
>> >
>> > > Hi Viktor,
>> > >
>> > > Welcome to the Flink dev mailing list!
>> > > You are certainly right, this is an unexpected behavior and IMO we
>> should
>> > > fix this.
>> > >
>> > > It would be great if you could open a JIRA issue for that and maybe
>> also
>> > > dig a bit into the issue to figure out why this happens.
>> > >
>> > > Thank you,
>> > > Fabian
>> > >
>> > > 2018-05-31 15:53 GMT+02:00 Viktor Vlasov <
>> viktorvlasovsiber...@gmail.com
>> > >:
>> > >
>> > > > Hi there!​
>> > > >
>> > > > First of all I want to thank you for your time and efforts about
>> this
>> > > > project.
>> > > >
>> > > > I am Software Engineer with almost 3 years experience, most of the
>> > time I
>> > > > work with Java related technologies.
>> > > >
>> > > > Recently I have started to consider possibility to contribute to
>> Flink.
>> > > > For begin I chose this issue: https://issues.apache.org/
>> > > > jira/browse/FLINK-9432.
>> > > >
>> > > > After implementation I have faced with an interesting question.
>> When I
>> > > was
>> > > > trying to decide what tests to create for the function DECADE in
>> class
>> > > > org/apache/flink/table/expressions/validation/
>> > > > ScalarFunctionsValidationTest.scala
>> > > > I've figured out that such functions as CENTURY and MILLENNIUM work
>> > with
>> > > > TIME type without problems.  Here an examples:
>> > > > EXTRACT(CENTURY FROM TIME '00:00:00') - returns 0
>> > > > EXTRACT(MILLENNIUM FROM TIME '00:00:00') - returns 0
>> > > >
>> > > > It's strange by my opinion, time is not date and how we can extract
>> > such
>> > > > things from that.
>> > > >
>> > > > Meanwhile when I try to use similar logic in calcite, error is
>> occured.
>> > > > Here an example:
>> > > > SELECT EXTRACT(CENTURY FROM TIME '00:00:00');
>> > > > throws `java.lang.AssertionError: unexpected TIME`
>> > > >
>> > > > Is it necessary to create separate issue for that?
>> > > >
>> > >
>> >
>>
>>
>>
>> --
>> "So you have to trust that the dots will somehow connect in your future."
>>
>
>


Re: [VOTE] Release flink-shaded 4.0, release candidate #2

2018-06-01 Thread Tzu-Li (Gordon) Tai
+1

- Checked signatures and hashes
- Source builds successfully
- Staged source release files are ASF compliant; does not contain md5
files, and have correct sha file name.

Cheers,
Gordon

On Fri, Jun 1, 2018 at 12:45 AM, Yaz Sh  wrote:

> +1
>
> I am new in this but I did following tests:
>
> - mvn clean package -Dshade-sources passed will all modules SUCCESS
> - checked all the licenses available
> - netty-route license has been removed
> - netty version 4.1.24.Final-4.0
> - jackson version 2.7.9-4.0
> - jackson-jsonSchema versio 2.7.9-4.0
> - guava version 18.0-4.0
> - asm version 5.0.4-4.0
> - flink force shading version 4.0
> - flink-shaded-netty-4  flink-shaded/tree/release-3.0/flink-shaded-netty-4>/packaged_licenses <
> https://github.com/apache/flink-shaded/tree/release-3.0/
> flink-shaded-netty-4/packaged_licenses>/LICENSE.jauter.txt removed
> - flink-shaded-netty-4  flink-shaded/tree/release-3.0/flink-shaded-netty-4>/packaged_licenses <
> https://github.com/apache/flink-shaded/tree/release-3.0/
> flink-shaded-netty-4/packaged_licenses>/LICENSE.netty_router.txt removed
>
>
> Cheers,
> Yaz
>
> > On May 30, 2018, at 10:29 AM, Piotr Nowojski 
> wrote:
> >
> > I’m carrying over my +1
> >
> >> On 30 May 2018, at 15:57, Chesnay Schepler  wrote:
> >>
> >> +1
> >>
> >> The checks i did for the previous RC are still valid.
> >> Additionally i verified that the netty jar no longer bundles the
> licenses for netty-router.
> >>
> >> On 30.05.2018 15:51, Chesnay Schepler wrote:
> >>> Hi everyone,
> >>> Please review and vote on the release candidate #2 for the version
> 4.0, as follows:
> >>> [ ] +1, Approve the release
> >>> [ ] -1, Do not approve the release (please provide specific comments)
> >>>
> >>> The complete staging area is available for your review, which includes:
> >>> * GitHub release notes [1],
> >>> * the official Apache source release to be deployed to dist.apache.org
>  [2],
> which are signed with the key with fingerprint 11D464BA [3],
> >>> * all artifacts to be deployed to the Maven Central Repository [4],
> >>> * source code tag "release-4.0-rc2" [5].
> >>>
> >>> The vote will be open for at least 72 hours. It is adopted by majority
> approval, with at least 3 PMC affirmative votes.
> >>>
> >>> Thanks,
> >>> Chesnay
> >>>
> >>> [1] https://github.com/apache/flink-shaded/milestone/4?closed=1
> >>> [2] https://dist.apache.org/repos/dist/dev/flink/flink-shaded-4.0-rc2/
> >>> [3] https://dist.apache.org/repos/dist/release/flink/KEYS
> >>> [4] https://repository.apache.org/content/repositories/
> orgapacheflink-1167/
> >>> [5] https://gitbox.apache.org/repos/asf?p=flink-shaded.git;a=commit;h=
> 33e86998fb8fe3d5b0a20260110d4ea636cc2528
> >>>
> >>>
> >>
> >
>
>