[jira] [Assigned] (FLINK-9575) Potential race condition when removing JobGraph in HA
[ https://issues.apache.org/jira/browse/FLINK-9575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dominik Wosiński reassigned FLINK-9575: --- Assignee: Dominik Wosiński > Potential race condition when removing JobGraph in HA > - > > Key: FLINK-9575 > URL: https://issues.apache.org/jira/browse/FLINK-9575 > Project: Flink > Issue Type: Bug >Affects Versions: 1.5.0 >Reporter: Dominik Wosiński >Assignee: Dominik Wosiński >Priority: Critical > Fix For: 1.6.0 > > > When we are removing the _JobGraph_ from _JobManager_ for example after > invoking _cancel()_, the following code is executed : > {noformat} > > val futureOption = currentJobs.get(jobID) match { case Some((eg, _)) => val > result = if (removeJobFromStateBackend) { val futureOption = Some(future { > try { // ...otherwise, we can have lingering resources when there is a > concurrent shutdown // and the ZooKeeper client is closed. Not removing the > job immediately allow the // shutdown to release all resources. > submittedJobGraphs.removeJobGraph(jobID) } catch { case t: Throwable => > log.warn(s"Could not remove submitted job graph $jobID.", t) } > }(context.dispatcher)) try { archive ! decorateMessage( > ArchiveExecutionGraph( jobID, ArchivedExecutionGraph.createFrom(eg))) } catch > { case t: Throwable => log.warn(s"Could not archive the execution graph > $eg.", t) } futureOption } else { None } currentJobs.remove(jobID) result > case None => None } // remove all job-related BLOBs from local and HA store > libraryCacheManager.unregisterJob(jobID) blobServer.cleanupJob(jobID, > removeJobFromStateBackend) jobManagerMetricGroup.removeJob(jobID) > futureOption } > val futureOption = currentJobs.get(jobID) match { > case Some((eg, _)) => > val result = if (removeJobFromStateBackend) { > val futureOption = Some(future { > try { > // ...otherwise, we can have lingering resources when there is a concurrent > shutdown > // and the ZooKeeper client is closed. Not removing the job immediately allow > the > // shutdown to release all resources. > submittedJobGraphs.removeJobGraph(jobID) > } catch { > case t: Throwable => log.warn(s"Could not remove submitted job graph > $jobID.", t) > } > }(context.dispatcher)) > try { > archive ! decorateMessage( > ArchiveExecutionGraph( > jobID, > ArchivedExecutionGraph.createFrom(eg))) > } catch { > case t: Throwable => log.warn(s"Could not archive the execution graph $eg.", > t) > } > futureOption > } else { > None > } > currentJobs.remove(jobID) > result > case None => None > } > // remove all job-related BLOBs from local and HA store > libraryCacheManager.unregisterJob(jobID) > blobServer.cleanupJob(jobID, removeJobFromStateBackend) > jobManagerMetricGroup.removeJob(jobID) > futureOption > }{noformat} > This causes the asynchronous removal of the job and synchronous removal of > blob files connected with this jar. This means as far as I understand that > there is a potential problem that we can fail to remove job graph from > _submittedJobGraphs._ If the JobManager fails and we elect the new leader it > can try to recover such job, but it will fail with an exception since the > assigned blob was already removed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-9632) SlotPool should notify the caller when allocateSlot meet an exception
[ https://issues.apache.org/jira/browse/FLINK-9632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu closed FLINK-9632. --- Resolution: Invalid > SlotPool should notify the caller when allocateSlot meet an exception > - > > Key: FLINK-9632 > URL: https://issues.apache.org/jira/browse/FLINK-9632 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > Labels: flip-6 > > In SlotPool, the allocateSlot() will return a CompletableFuture, > but this future will only be completed when slotAndLocalityFuture return a > LogicSlot, if slotAndLocalityFuture is completed exceptionally, it will never > be completed. so the caller will never know it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518905#comment-16518905 ] ASF GitHub Bot commented on FLINK-9599: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6189#discussion_r197008798 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java --- @@ -103,77 +102,68 @@ protected void respondAsLeader(ChannelHandlerContext ctx, RoutedRequest routedRe return; } - ByteBuf msgContent = ((FullHttpRequest) httpRequest).content(); - - R request; - if (isFileUpload()) { - final Path path = ctx.channel().attr(FileUploadHandler.UPLOADED_FILE).get(); - if (path == null) { - HandlerUtils.sendErrorResponse( - ctx, - httpRequest, - new ErrorResponseBody("Client did not upload a file."), - HttpResponseStatus.BAD_REQUEST, - responseHeaders); - return; - } - //noinspection unchecked - request = (R) new FileUpload(path); - } else if (msgContent.capacity() == 0) { - try { - request = MAPPER.readValue("{}", untypedResponseMessageHeaders.getRequestClass()); - } catch (JsonParseException | JsonMappingException je) { - log.error("Request did not conform to expected format.", je); - HandlerUtils.sendErrorResponse( - ctx, - httpRequest, - new ErrorResponseBody("Bad request received."), - HttpResponseStatus.BAD_REQUEST, - responseHeaders); - return; + final ByteBuf msgContent = ((FullHttpRequest) httpRequest).content(); + + try (FileUploads uploadedFiles = FileUploadHandler.getMultipartFileUploads(ctx)) { + + R request; + if (msgContent.capacity() == 0) { + try { + request = MAPPER.readValue("{}", untypedResponseMessageHeaders.getRequestClass()); + } catch (JsonParseException | JsonMappingException je) { + log.error("Request did not conform to expected format.", je); + HandlerUtils.sendErrorResponse( + ctx, + httpRequest, + new ErrorResponseBody("Bad request received."), + HttpResponseStatus.BAD_REQUEST, + responseHeaders); + return; + } + } else { + try { + ByteBufInputStream in = new ByteBufInputStream(msgContent); --- End diff -- I would suggest to use try-with-resource to make sure to close `in`. > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > s
[GitHub] flink pull request #6189: [FLINK-9599][rest] RestClient supports FileUploads...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6189#discussion_r197008798 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java --- @@ -103,77 +102,68 @@ protected void respondAsLeader(ChannelHandlerContext ctx, RoutedRequest routedRe return; } - ByteBuf msgContent = ((FullHttpRequest) httpRequest).content(); - - R request; - if (isFileUpload()) { - final Path path = ctx.channel().attr(FileUploadHandler.UPLOADED_FILE).get(); - if (path == null) { - HandlerUtils.sendErrorResponse( - ctx, - httpRequest, - new ErrorResponseBody("Client did not upload a file."), - HttpResponseStatus.BAD_REQUEST, - responseHeaders); - return; - } - //noinspection unchecked - request = (R) new FileUpload(path); - } else if (msgContent.capacity() == 0) { - try { - request = MAPPER.readValue("{}", untypedResponseMessageHeaders.getRequestClass()); - } catch (JsonParseException | JsonMappingException je) { - log.error("Request did not conform to expected format.", je); - HandlerUtils.sendErrorResponse( - ctx, - httpRequest, - new ErrorResponseBody("Bad request received."), - HttpResponseStatus.BAD_REQUEST, - responseHeaders); - return; + final ByteBuf msgContent = ((FullHttpRequest) httpRequest).content(); + + try (FileUploads uploadedFiles = FileUploadHandler.getMultipartFileUploads(ctx)) { + + R request; + if (msgContent.capacity() == 0) { + try { + request = MAPPER.readValue("{}", untypedResponseMessageHeaders.getRequestClass()); + } catch (JsonParseException | JsonMappingException je) { + log.error("Request did not conform to expected format.", je); + HandlerUtils.sendErrorResponse( + ctx, + httpRequest, + new ErrorResponseBody("Bad request received."), + HttpResponseStatus.BAD_REQUEST, + responseHeaders); + return; + } + } else { + try { + ByteBufInputStream in = new ByteBufInputStream(msgContent); --- End diff -- I would suggest to use try-with-resource to make sure to close `in`. ---
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518896#comment-16518896 ] ASF GitHub Bot commented on FLINK-9514: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196998472 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * This class wraps list state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry value of state with TTL + */ +class TtlListState extends + AbstractTtlState, List>, InternalListState>> + implements InternalListState { + TtlListState( + InternalListState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public void update(List values) throws Exception { + updateInternal(values); + } + + @Override + public void addAll(List values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.addAll(withTs(values)); + } + + @Override + public Iterable get() throws Exception { + Iterable> ttlValue = original.get(); + ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue; + if (updateTsOnRead) { + List> collected = collect(ttlValue); + ttlValue = collected; + updateTs(collected); + } + final Iterable> finalResult = ttlValue; + return () -> new IteratorWithCleanup(finalResult.iterator()); + } + + private void updateTs(List> ttlValue) throws Exception { + List> unexpiredWithUpdatedTs = ttlValue.stream() + .filter(v -> !expired(v)) + .map(this::rewrapWithNewTs) + .collect(Collectors.toList()); + if (!unexpiredWithUpdatedTs.isEmpty()) { + original.update(unexpiredWithUpdatedTs); + } + } + + @Override + public void add(T value) throws Exception { + Preconditions.checkNotNull(value, "You cannot add null to a ListState."); + original.add(wrapWithTs(value)); + } + + @Override + public void clear() { + original.clear(); + } + + @Override + public void mergeNamespaces(N target, Collection sources) throws Exception { + original.mergeNamespaces(target, sources); + } + + @Override + public List getInternal() throws Exception { + return collect(get()); + } + + private List collect(Iterable iterable) { --- End diff -- If we've called `getInterval()` in `get()`, and make the `updateTs()` to accept `Iterable`, then this method seems could be removed(Or at least, we should add a check for if the `iterable` is assignable from `List`, if true we could cast it to List and return immediately). > Create wrapper with TTL logic for value state > -
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518893#comment-16518893 ] ASF GitHub Bot commented on FLINK-9514: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197001110 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReducingState.java --- @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalReducingState; + +import java.util.Collection; + +/** + * This class wraps reducing state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user value of state with TTL + */ +class TtlReducingState + extends AbstractTtlState, InternalReducingState>> + implements InternalReducingState { + TtlReducingState( + InternalReducingState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public T get() throws Exception { + return getInternal(); + } + + @Override + public void add(T value) throws Exception { + original.add(wrapWithTs(value, Long.MAX_VALUE)); + } + + @Override + public void clear() { + original.clear(); + } + + @Override + public void mergeNamespaces(N target, Collection sources) throws Exception { + original.mergeNamespaces(target, sources); --- End diff -- Again, Should we also do the TTL check for original.mergeNamespaces()? Since we need to query the state when merging namespaces. > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518892#comment-16518892 ] ASF GitHub Bot commented on FLINK-9514: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196996839 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * This class wraps list state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry value of state with TTL + */ +class TtlListState extends + AbstractTtlState, List>, InternalListState>> + implements InternalListState { + TtlListState( + InternalListState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public void update(List values) throws Exception { + updateInternal(values); + } + + @Override + public void addAll(List values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.addAll(withTs(values)); + } + + @Override + public Iterable get() throws Exception { + Iterable> ttlValue = original.get(); + ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue; + if (updateTsOnRead) { + List> collected = collect(ttlValue); --- End diff -- In this block, we need to iterate the `ttlValue` twice, one for `collect()` and one for `updateTs()`. If we could make the updateTs to accept `Iterable` as the argument, then we can avoiding the `collect()` here, this way we only need to iterate the `ttlValue` once. > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518898#comment-16518898 ] ASF GitHub Bot commented on FLINK-9514: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197001339 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAggregatingState.java --- @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalAggregatingState; + +import java.util.Collection; + +/** + * This class wraps aggregating state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the value added to the state + * @param The type of the accumulator (intermediate aggregate state). + * @param Type of the value extracted from the state + * + */ +class TtlAggregatingState + extends AbstractTtlState, InternalAggregatingState, OUT>> + implements InternalAggregatingState { + + TtlAggregatingState( + InternalAggregatingState, OUT> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer valueSerializer, + TtlAggregateFunction aggregateFunction) { + super(originalState, config, timeProvider, valueSerializer); + aggregateFunction.stateClear = originalState::clear; + aggregateFunction.updater = originalState::updateInternal; + } + + @Override + public OUT get() throws Exception { + return original.get(); + } + + @Override + public void add(IN value) throws Exception { + original.add(value); + } + + @Override + public void clear() { + original.clear(); + } + + @Override + public ACC getInternal() throws Exception { + return getWithTtlCheckAndUpdate(original::getInternal, original::updateInternal); + } + + @Override + public void updateInternal(ACC valueToStore) throws Exception { + original.updateInternal(wrapWithTs(valueToStore)); + } + + @Override + public void mergeNamespaces(N target, Collection sources) throws Exception { + original.mergeNamespaces(target, sources); --- End diff -- Should we also do the TTL check for original.mergeNamespaces()? Since we need to query the state when merging namespaces. > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518894#comment-16518894 ] ASF GitHub Bot commented on FLINK-9514: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196995820 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * This class wraps list state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry value of state with TTL + */ +class TtlListState extends + AbstractTtlState, List>, InternalListState>> + implements InternalListState { + TtlListState( + InternalListState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public void update(List values) throws Exception { + updateInternal(values); + } + + @Override + public void addAll(List values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.addAll(withTs(values)); + } + + @Override + public Iterable get() throws Exception { + Iterable> ttlValue = original.get(); + ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue; + if (updateTsOnRead) { + List> collected = collect(ttlValue); + ttlValue = collected; + updateTs(collected); + } + final Iterable> finalResult = ttlValue; + return () -> new IteratorWithCleanup(finalResult.iterator()); + } + + private void updateTs(List> ttlValue) throws Exception { + List> unexpiredWithUpdatedTs = ttlValue.stream() + .filter(v -> !expired(v)) + .map(this::rewrapWithNewTs) + .collect(Collectors.toList()); + if (!unexpiredWithUpdatedTs.isEmpty()) { + original.update(unexpiredWithUpdatedTs); + } + } + + @Override + public void add(T value) throws Exception { + Preconditions.checkNotNull(value, "You cannot add null to a ListState."); + original.add(wrapWithTs(value)); + } + + @Override + public void clear() { + original.clear(); + } + + @Override + public void mergeNamespaces(N target, Collection sources) throws Exception { + original.mergeNamespaces(target, sources); --- End diff -- Again, should we also do the `TTL` check for `original.mergeNamespaces()`? Since we need to query the state when merging namespaces. > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518897#comment-16518897 ] ASF GitHub Bot commented on FLINK-9514: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197004891 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; + +/** + * This class wraps user value of state with TTL. + * + * @param Type of the user value of state with TTL + */ +class TtlValue implements Serializable { + private final T userValue; + private final long expirationTimestamp; --- End diff -- The `expirationTimestamp` is an absolute timestamp, should we do the timestamp shift for `TtlValue` when checkpoint & recovery? For example, when user using the `ProcessTime` as the TimeCharacater, and set the `TTL = 10min`. For some reason, he triggers a savepoint, and after 11 min he recover the job from the savepoint, if we don't do the timestamp shift, then all the state will be expired. > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518895#comment-16518895 ] ASF GitHub Bot commented on FLINK-9514: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196996094 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * This class wraps list state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry value of state with TTL + */ +class TtlListState extends + AbstractTtlState, List>, InternalListState>> + implements InternalListState { + TtlListState( + InternalListState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public void update(List values) throws Exception { + updateInternal(values); + } + + @Override + public void addAll(List values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.addAll(withTs(values)); + } + + @Override + public Iterable get() throws Exception { + Iterable> ttlValue = original.get(); + ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue; + if (updateTsOnRead) { + List> collected = collect(ttlValue); + ttlValue = collected; + updateTs(collected); + } + final Iterable> finalResult = ttlValue; + return () -> new IteratorWithCleanup(finalResult.iterator()); + } + + private void updateTs(List> ttlValue) throws Exception { + List> unexpiredWithUpdatedTs = ttlValue.stream() + .filter(v -> !expired(v)) + .map(this::rewrapWithNewTs) + .collect(Collectors.toList()); + if (!unexpiredWithUpdatedTs.isEmpty()) { + original.update(unexpiredWithUpdatedTs); + } + } + + @Override + public void add(T value) throws Exception { + Preconditions.checkNotNull(value, "You cannot add null to a ListState."); + original.add(wrapWithTs(value)); + } + + @Override + public void clear() { + original.clear(); + } + + @Override + public void mergeNamespaces(N target, Collection sources) throws Exception { + original.mergeNamespaces(target, sources); + } + + @Override + public List getInternal() throws Exception { + return collect(get()); --- End diff -- This looks a bit weird, my gut feeling is that we should call `getInternal()` in `get()`(as we called `updateInternal()` in `update()` in this class), but here is reverse. > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Pro
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197001110 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReducingState.java --- @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalReducingState; + +import java.util.Collection; + +/** + * This class wraps reducing state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user value of state with TTL + */ +class TtlReducingState + extends AbstractTtlState, InternalReducingState>> + implements InternalReducingState { + TtlReducingState( + InternalReducingState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public T get() throws Exception { + return getInternal(); + } + + @Override + public void add(T value) throws Exception { + original.add(wrapWithTs(value, Long.MAX_VALUE)); + } + + @Override + public void clear() { + original.clear(); + } + + @Override + public void mergeNamespaces(N target, Collection sources) throws Exception { + original.mergeNamespaces(target, sources); --- End diff -- Again, Should we also do the TTL check for original.mergeNamespaces()? Since we need to query the state when merging namespaces. ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196995820 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * This class wraps list state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry value of state with TTL + */ +class TtlListState extends + AbstractTtlState, List>, InternalListState>> + implements InternalListState { + TtlListState( + InternalListState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public void update(List values) throws Exception { + updateInternal(values); + } + + @Override + public void addAll(List values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.addAll(withTs(values)); + } + + @Override + public Iterable get() throws Exception { + Iterable> ttlValue = original.get(); + ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue; + if (updateTsOnRead) { + List> collected = collect(ttlValue); + ttlValue = collected; + updateTs(collected); + } + final Iterable> finalResult = ttlValue; + return () -> new IteratorWithCleanup(finalResult.iterator()); + } + + private void updateTs(List> ttlValue) throws Exception { + List> unexpiredWithUpdatedTs = ttlValue.stream() + .filter(v -> !expired(v)) + .map(this::rewrapWithNewTs) + .collect(Collectors.toList()); + if (!unexpiredWithUpdatedTs.isEmpty()) { + original.update(unexpiredWithUpdatedTs); + } + } + + @Override + public void add(T value) throws Exception { + Preconditions.checkNotNull(value, "You cannot add null to a ListState."); + original.add(wrapWithTs(value)); + } + + @Override + public void clear() { + original.clear(); + } + + @Override + public void mergeNamespaces(N target, Collection sources) throws Exception { + original.mergeNamespaces(target, sources); --- End diff -- Again, should we also do the `TTL` check for `original.mergeNamespaces()`? Since we need to query the state when merging namespaces. ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196998472 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * This class wraps list state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry value of state with TTL + */ +class TtlListState extends + AbstractTtlState, List>, InternalListState>> + implements InternalListState { + TtlListState( + InternalListState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public void update(List values) throws Exception { + updateInternal(values); + } + + @Override + public void addAll(List values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.addAll(withTs(values)); + } + + @Override + public Iterable get() throws Exception { + Iterable> ttlValue = original.get(); + ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue; + if (updateTsOnRead) { + List> collected = collect(ttlValue); + ttlValue = collected; + updateTs(collected); + } + final Iterable> finalResult = ttlValue; + return () -> new IteratorWithCleanup(finalResult.iterator()); + } + + private void updateTs(List> ttlValue) throws Exception { + List> unexpiredWithUpdatedTs = ttlValue.stream() + .filter(v -> !expired(v)) + .map(this::rewrapWithNewTs) + .collect(Collectors.toList()); + if (!unexpiredWithUpdatedTs.isEmpty()) { + original.update(unexpiredWithUpdatedTs); + } + } + + @Override + public void add(T value) throws Exception { + Preconditions.checkNotNull(value, "You cannot add null to a ListState."); + original.add(wrapWithTs(value)); + } + + @Override + public void clear() { + original.clear(); + } + + @Override + public void mergeNamespaces(N target, Collection sources) throws Exception { + original.mergeNamespaces(target, sources); + } + + @Override + public List getInternal() throws Exception { + return collect(get()); + } + + private List collect(Iterable iterable) { --- End diff -- If we've called `getInterval()` in `get()`, and make the `updateTs()` to accept `Iterable`, then this method seems could be removed(Or at least, we should add a check for if the `iterable` is assignable from `List`, if true we could cast it to List and return immediately). ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197004891 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; + +/** + * This class wraps user value of state with TTL. + * + * @param Type of the user value of state with TTL + */ +class TtlValue implements Serializable { + private final T userValue; + private final long expirationTimestamp; --- End diff -- The `expirationTimestamp` is an absolute timestamp, should we do the timestamp shift for `TtlValue` when checkpoint & recovery? For example, when user using the `ProcessTime` as the TimeCharacater, and set the `TTL = 10min`. For some reason, he triggers a savepoint, and after 11 min he recover the job from the savepoint, if we don't do the timestamp shift, then all the state will be expired. ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197001339 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAggregatingState.java --- @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalAggregatingState; + +import java.util.Collection; + +/** + * This class wraps aggregating state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the value added to the state + * @param The type of the accumulator (intermediate aggregate state). + * @param Type of the value extracted from the state + * + */ +class TtlAggregatingState + extends AbstractTtlState, InternalAggregatingState, OUT>> + implements InternalAggregatingState { + + TtlAggregatingState( + InternalAggregatingState, OUT> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer valueSerializer, + TtlAggregateFunction aggregateFunction) { + super(originalState, config, timeProvider, valueSerializer); + aggregateFunction.stateClear = originalState::clear; + aggregateFunction.updater = originalState::updateInternal; + } + + @Override + public OUT get() throws Exception { + return original.get(); + } + + @Override + public void add(IN value) throws Exception { + original.add(value); + } + + @Override + public void clear() { + original.clear(); + } + + @Override + public ACC getInternal() throws Exception { + return getWithTtlCheckAndUpdate(original::getInternal, original::updateInternal); + } + + @Override + public void updateInternal(ACC valueToStore) throws Exception { + original.updateInternal(wrapWithTs(valueToStore)); + } + + @Override + public void mergeNamespaces(N target, Collection sources) throws Exception { + original.mergeNamespaces(target, sources); --- End diff -- Should we also do the TTL check for original.mergeNamespaces()? Since we need to query the state when merging namespaces. ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196996094 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * This class wraps list state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry value of state with TTL + */ +class TtlListState extends + AbstractTtlState, List>, InternalListState>> + implements InternalListState { + TtlListState( + InternalListState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public void update(List values) throws Exception { + updateInternal(values); + } + + @Override + public void addAll(List values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.addAll(withTs(values)); + } + + @Override + public Iterable get() throws Exception { + Iterable> ttlValue = original.get(); + ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue; + if (updateTsOnRead) { + List> collected = collect(ttlValue); + ttlValue = collected; + updateTs(collected); + } + final Iterable> finalResult = ttlValue; + return () -> new IteratorWithCleanup(finalResult.iterator()); + } + + private void updateTs(List> ttlValue) throws Exception { + List> unexpiredWithUpdatedTs = ttlValue.stream() + .filter(v -> !expired(v)) + .map(this::rewrapWithNewTs) + .collect(Collectors.toList()); + if (!unexpiredWithUpdatedTs.isEmpty()) { + original.update(unexpiredWithUpdatedTs); + } + } + + @Override + public void add(T value) throws Exception { + Preconditions.checkNotNull(value, "You cannot add null to a ListState."); + original.add(wrapWithTs(value)); + } + + @Override + public void clear() { + original.clear(); + } + + @Override + public void mergeNamespaces(N target, Collection sources) throws Exception { + original.mergeNamespaces(target, sources); + } + + @Override + public List getInternal() throws Exception { + return collect(get()); --- End diff -- This looks a bit weird, my gut feeling is that we should call `getInternal()` in `get()`(as we called `updateInternal()` in `update()` in this class), but here is reverse. ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196996839 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * This class wraps list state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry value of state with TTL + */ +class TtlListState extends + AbstractTtlState, List>, InternalListState>> + implements InternalListState { + TtlListState( + InternalListState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public void update(List values) throws Exception { + updateInternal(values); + } + + @Override + public void addAll(List values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.addAll(withTs(values)); + } + + @Override + public Iterable get() throws Exception { + Iterable> ttlValue = original.get(); + ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue; + if (updateTsOnRead) { + List> collected = collect(ttlValue); --- End diff -- In this block, we need to iterate the `ttlValue` twice, one for `collect()` and one for `updateTs()`. If we could make the updateTs to accept `Iterable` as the argument, then we can avoiding the `collect()` here, this way we only need to iterate the `ttlValue` once. ---
[jira] [Commented] (FLINK-8863) Add user-defined function support in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-8863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518889#comment-16518889 ] ASF GitHub Bot commented on FLINK-8863: --- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/6090#discussion_r197005582 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/PrimitiveTypeDescriptor.scala --- @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.table.typeutils.TypeStringUtils + +/** + * Descriptor for a primitive type. + */ +class PrimitiveTypeDescriptor[T] extends HierarchyDescriptor { + + // TODO not sure if we should the BasicTypeInfo here + var typeInformation: BasicTypeInfo[T] = _ + var value: T = _ + + def setType(basicType: BasicTypeInfo[T]): PrimitiveTypeDescriptor[T] = { --- End diff -- Yes, the `TypeStringUtils` can extract every valid type information and the user can specify the type via the API (e.g., `setType(Types.SHORT)`). I just wonder how the basic types supported by Java can be properly inferred from the config file (e.g., how to decide a parameter `1` is a byte, a short or an int). Although a short value or a byte value can be represented with an int, that will affect the constructor searching via Java reflection. Do you have any ideas for that? > Add user-defined function support in SQL Client > --- > > Key: FLINK-8863 > URL: https://issues.apache.org/jira/browse/FLINK-8863 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > Labels: pull-request-available > > This issue is a subtask of part two "Full Embedded SQL Client" of the > implementation plan mentioned in > [FLIP-24|https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client]. > > It should be possible to declare user-defined functions in the SQL client. > For now, we limit the registration to classes that implement > {{ScalarFunction}}, {{TableFunction}}, {{AggregateFunction}}. Functions that > are implemented in SQL are not part of this issue. > I would suggest to introduce a {{functions}} top-level property. The > declaration could look similar to: > {code} > functions: > - name: testFunction > from: class <-- optional, default: class > class: org.my.MyScalarFunction > constructor: <-- optional, needed for certain types of functions > - 42.0 > - class: org.my.Class <-- possibility to create objects via properties > constructor: > - 1 > - true > - false > - "whatever" > - type: INT > value: 1 > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/6090#discussion_r197005582 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/PrimitiveTypeDescriptor.scala --- @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.table.typeutils.TypeStringUtils + +/** + * Descriptor for a primitive type. + */ +class PrimitiveTypeDescriptor[T] extends HierarchyDescriptor { + + // TODO not sure if we should the BasicTypeInfo here + var typeInformation: BasicTypeInfo[T] = _ + var value: T = _ + + def setType(basicType: BasicTypeInfo[T]): PrimitiveTypeDescriptor[T] = { --- End diff -- Yes, the `TypeStringUtils` can extract every valid type information and the user can specify the type via the API (e.g., `setType(Types.SHORT)`). I just wonder how the basic types supported by Java can be properly inferred from the config file (e.g., how to decide a parameter `1` is a byte, a short or an int). Although a short value or a byte value can be represented with an int, that will affect the constructor searching via Java reflection. Do you have any ideas for that? ---
[jira] [Created] (FLINK-9632) SlotPool should notify the call when allocateSlot meet an exception
shuai.xu created FLINK-9632: --- Summary: SlotPool should notify the call when allocateSlot meet an exception Key: FLINK-9632 URL: https://issues.apache.org/jira/browse/FLINK-9632 Project: Flink Issue Type: Bug Components: Cluster Management Reporter: shuai.xu Assignee: shuai.xu In SlotPool, the allocateSlot() will return a CompletableFuture, but this future will only be completed when slotAndLocalityFuture return a LogicSlot, if slotAndLocalityFuture is completed exceptionally, it will never be completed. so the caller will never know it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9632) SlotPool should notify the caller when allocateSlot meet an exception
[ https://issues.apache.org/jira/browse/FLINK-9632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu updated FLINK-9632: Summary: SlotPool should notify the caller when allocateSlot meet an exception (was: SlotPool should notify the call when allocateSlot meet an exception) > SlotPool should notify the caller when allocateSlot meet an exception > - > > Key: FLINK-9632 > URL: https://issues.apache.org/jira/browse/FLINK-9632 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > Labels: flip-6 > > In SlotPool, the allocateSlot() will return a CompletableFuture, > but this future will only be completed when slotAndLocalityFuture return a > LogicSlot, if slotAndLocalityFuture is completed exceptionally, it will never > be completed. so the caller will never know it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9631) use Files.createDirectories instead of directory.mkdirs
makeyang created FLINK-9631: --- Summary: use Files.createDirectories instead of directory.mkdirs Key: FLINK-9631 URL: https://issues.apache.org/jira/browse/FLINK-9631 Project: Flink Issue Type: Improvement Components: Java API Affects Versions: 1.4.2, 1.5.0 Environment: flink1.4 jdk1.8 latest linux 2.6 Reporter: makeyang Assignee: makeyang job can't be run due to below exception: {color:#6a8759}Could not create RocksDB data directory{color} but with this exception, I can't tell exactly why. so I suggest Files.createDirectories which throw exception be used rather than File.mkdirs I have some more suggestions: # should we use Files.createDirectories to relpace File.mkdirs? # each time task manager throw exception to jobmanager, should IP+nodeId be contained in exception, which means we should define more flink exception which is used to wrap other exceptions such as jdk exceptions? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8863) Add user-defined function support in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-8863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518854#comment-16518854 ] ASF GitHub Bot commented on FLINK-8863: --- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/6090#discussion_r196998269 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/PrimitiveTypeValidator.scala --- @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.table.api.TableException +import org.apache.flink.table.descriptors.PrimitiveTypeValidator.PRIMITIVE_VALUE +import scala.collection.JavaConversions._ + +/** + * Validator for [[PrimitiveTypeDescriptor]]. + */ +class PrimitiveTypeValidator extends HierarchyDescriptorValidator { + override def validateWithPrefix(keyPrefix: String, properties: DescriptorProperties): Unit = { +properties + .validateType(s"$keyPrefix${PrimitiveTypeValidator.PRIMITIVE_TYPE}", isOptional = false) +properties + .validateString(s"$keyPrefix${PrimitiveTypeValidator.PRIMITIVE_VALUE}", isOptional = false, 1) + } +} + +object PrimitiveTypeValidator { + val PRIMITIVE_TYPE = "type" + val PRIMITIVE_VALUE = "value" + + def derivePrimitiveValue(keyPrefix: String, properties: DescriptorProperties): Any = { +val typeInfo = + properties.getType(s"$keyPrefix$PRIMITIVE_TYPE") +val valueKey = s"$keyPrefix$PRIMITIVE_VALUE" +val value = typeInfo match { + case basicType: BasicTypeInfo[_] => +basicType match { + case BasicTypeInfo.INT_TYPE_INFO => +properties.getInt(valueKey) + case BasicTypeInfo.LONG_TYPE_INFO => +properties.getLong(valueKey) + case BasicTypeInfo.DOUBLE_TYPE_INFO => +properties.getDouble(valueKey) + case BasicTypeInfo.STRING_TYPE_INFO => +properties.getString(valueKey) + case BasicTypeInfo.BOOLEAN_TYPE_INFO => +properties.getBoolean(valueKey) + //TODO add more types --- End diff -- Ah, yes. The array type was not considered. I'll think about that and add the support if it's not hard to implement. Otherwise, we could arrange it to a follow-up issue. > Add user-defined function support in SQL Client > --- > > Key: FLINK-8863 > URL: https://issues.apache.org/jira/browse/FLINK-8863 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > Labels: pull-request-available > > This issue is a subtask of part two "Full Embedded SQL Client" of the > implementation plan mentioned in > [FLIP-24|https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client]. > > It should be possible to declare user-defined functions in the SQL client. > For now, we limit the registration to classes that implement > {{ScalarFunction}}, {{TableFunction}}, {{AggregateFunction}}. Functions that > are implemented in SQL are not part of this issue. > I would suggest to introduce a {{functions}} top-level property. The > declaration could look similar to: > {code} > functions: > - name: testFunction > from: class <-- optional, default: class > class: org.my.MyScalarFunction > constructor: <-- optional, needed for certain types of functions > - 42.0 > - class: org.my.Class <-- possibility to create objects via properties > constructor: > - 1 > - true > - false > - "whatever" > - type: INT > value: 1 > {code} -- This message wa
[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/6090#discussion_r196998269 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/PrimitiveTypeValidator.scala --- @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.table.api.TableException +import org.apache.flink.table.descriptors.PrimitiveTypeValidator.PRIMITIVE_VALUE +import scala.collection.JavaConversions._ + +/** + * Validator for [[PrimitiveTypeDescriptor]]. + */ +class PrimitiveTypeValidator extends HierarchyDescriptorValidator { + override def validateWithPrefix(keyPrefix: String, properties: DescriptorProperties): Unit = { +properties + .validateType(s"$keyPrefix${PrimitiveTypeValidator.PRIMITIVE_TYPE}", isOptional = false) +properties + .validateString(s"$keyPrefix${PrimitiveTypeValidator.PRIMITIVE_VALUE}", isOptional = false, 1) + } +} + +object PrimitiveTypeValidator { + val PRIMITIVE_TYPE = "type" + val PRIMITIVE_VALUE = "value" + + def derivePrimitiveValue(keyPrefix: String, properties: DescriptorProperties): Any = { +val typeInfo = + properties.getType(s"$keyPrefix$PRIMITIVE_TYPE") +val valueKey = s"$keyPrefix$PRIMITIVE_VALUE" +val value = typeInfo match { + case basicType: BasicTypeInfo[_] => +basicType match { + case BasicTypeInfo.INT_TYPE_INFO => +properties.getInt(valueKey) + case BasicTypeInfo.LONG_TYPE_INFO => +properties.getLong(valueKey) + case BasicTypeInfo.DOUBLE_TYPE_INFO => +properties.getDouble(valueKey) + case BasicTypeInfo.STRING_TYPE_INFO => +properties.getString(valueKey) + case BasicTypeInfo.BOOLEAN_TYPE_INFO => +properties.getBoolean(valueKey) + //TODO add more types --- End diff -- Ah, yes. The array type was not considered. I'll think about that and add the support if it's not hard to implement. Otherwise, we could arrange it to a follow-up issue. ---
[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/6090#discussion_r196996876 --- Diff: flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/UserDefinedFunctions.java --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.gateway.utils; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.Types; +import org.apache.flink.table.functions.AggregateFunction; +import org.apache.flink.table.functions.ScalarFunction; +import org.apache.flink.table.functions.TableFunction; +import org.apache.flink.types.Row; + +/** + * A bunch of UDFs for SQL-Client test. + */ +public class UserDefinedFunctions { --- End diff -- The testing functions provided in `flink-table` are mainly considered for the functional test. While the functions added there are mainly considered for the constructional test, i.e., I made the UDF constructors complex or event sort of unreasonable... ---
[jira] [Commented] (FLINK-8863) Add user-defined function support in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-8863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518839#comment-16518839 ] ASF GitHub Bot commented on FLINK-8863: --- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/6090#discussion_r196996876 --- Diff: flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/UserDefinedFunctions.java --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.gateway.utils; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.Types; +import org.apache.flink.table.functions.AggregateFunction; +import org.apache.flink.table.functions.ScalarFunction; +import org.apache.flink.table.functions.TableFunction; +import org.apache.flink.types.Row; + +/** + * A bunch of UDFs for SQL-Client test. + */ +public class UserDefinedFunctions { --- End diff -- The testing functions provided in `flink-table` are mainly considered for the functional test. While the functions added there are mainly considered for the constructional test, i.e., I made the UDF constructors complex or event sort of unreasonable... > Add user-defined function support in SQL Client > --- > > Key: FLINK-8863 > URL: https://issues.apache.org/jira/browse/FLINK-8863 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > Labels: pull-request-available > > This issue is a subtask of part two "Full Embedded SQL Client" of the > implementation plan mentioned in > [FLIP-24|https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client]. > > It should be possible to declare user-defined functions in the SQL client. > For now, we limit the registration to classes that implement > {{ScalarFunction}}, {{TableFunction}}, {{AggregateFunction}}. Functions that > are implemented in SQL are not part of this issue. > I would suggest to introduce a {{functions}} top-level property. The > declaration could look similar to: > {code} > functions: > - name: testFunction > from: class <-- optional, default: class > class: org.my.MyScalarFunction > constructor: <-- optional, needed for certain types of functions > - 42.0 > - class: org.my.Class <-- possibility to create objects via properties > constructor: > - 1 > - true > - false > - "whatever" > - type: INT > value: 1 > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8863) Add user-defined function support in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-8863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518822#comment-16518822 ] ASF GitHub Bot commented on FLINK-8863: --- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/6090#discussion_r196996002 --- Diff: flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java --- @@ -145,6 +146,68 @@ public void testTableSchema() throws Exception { assertEquals(expectedTableSchema, actualTableSchema); } + @Test(timeout = 30_000L) + public void testScalarUDF() throws Exception { --- End diff -- I'll add more test cases for that. > Add user-defined function support in SQL Client > --- > > Key: FLINK-8863 > URL: https://issues.apache.org/jira/browse/FLINK-8863 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > Labels: pull-request-available > > This issue is a subtask of part two "Full Embedded SQL Client" of the > implementation plan mentioned in > [FLIP-24|https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client]. > > It should be possible to declare user-defined functions in the SQL client. > For now, we limit the registration to classes that implement > {{ScalarFunction}}, {{TableFunction}}, {{AggregateFunction}}. Functions that > are implemented in SQL are not part of this issue. > I would suggest to introduce a {{functions}} top-level property. The > declaration could look similar to: > {code} > functions: > - name: testFunction > from: class <-- optional, default: class > class: org.my.MyScalarFunction > constructor: <-- optional, needed for certain types of functions > - 42.0 > - class: org.my.Class <-- possibility to create objects via properties > constructor: > - 1 > - true > - false > - "whatever" > - type: INT > value: 1 > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8863) Add user-defined function support in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-8863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518820#comment-16518820 ] ASF GitHub Bot commented on FLINK-8863: --- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/6090#discussion_r196995942 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java --- @@ -187,6 +206,7 @@ private static ClusterSpecification createClusterSpecification(CustomCommandLine private final StreamExecutionEnvironment streamExecEnv; private final TableEnvironment tableEnv; + @SuppressWarnings("unchecked") --- End diff -- The `AggregateFunction` and `TableFunction` take generic type parameters. Thus when do conversions (e.g., `streamTableEnvironment.registerFunction(k, (AggregateFunction) udf);`) the Java compiler complains about that. > Add user-defined function support in SQL Client > --- > > Key: FLINK-8863 > URL: https://issues.apache.org/jira/browse/FLINK-8863 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > Labels: pull-request-available > > This issue is a subtask of part two "Full Embedded SQL Client" of the > implementation plan mentioned in > [FLIP-24|https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client]. > > It should be possible to declare user-defined functions in the SQL client. > For now, we limit the registration to classes that implement > {{ScalarFunction}}, {{TableFunction}}, {{AggregateFunction}}. Functions that > are implemented in SQL are not part of this issue. > I would suggest to introduce a {{functions}} top-level property. The > declaration could look similar to: > {code} > functions: > - name: testFunction > from: class <-- optional, default: class > class: org.my.MyScalarFunction > constructor: <-- optional, needed for certain types of functions > - 42.0 > - class: org.my.Class <-- possibility to create objects via properties > constructor: > - 1 > - true > - false > - "whatever" > - type: INT > value: 1 > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/6090#discussion_r196996002 --- Diff: flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java --- @@ -145,6 +146,68 @@ public void testTableSchema() throws Exception { assertEquals(expectedTableSchema, actualTableSchema); } + @Test(timeout = 30_000L) + public void testScalarUDF() throws Exception { --- End diff -- I'll add more test cases for that. ---
[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/6090#discussion_r196995942 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java --- @@ -187,6 +206,7 @@ private static ClusterSpecification createClusterSpecification(CustomCommandLine private final StreamExecutionEnvironment streamExecEnv; private final TableEnvironment tableEnv; + @SuppressWarnings("unchecked") --- End diff -- The `AggregateFunction` and `TableFunction` take generic type parameters. Thus when do conversions (e.g., `streamTableEnvironment.registerFunction(k, (AggregateFunction) udf);`) the Java compiler complains about that. ---
[jira] [Commented] (FLINK-8863) Add user-defined function support in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-8863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518813#comment-16518813 ] ASF GitHub Bot commented on FLINK-8863: --- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/6090#discussion_r196995241 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java --- @@ -101,6 +110,16 @@ public ExecutionContext(Environment defaultEnvironment, SessionContext sessionCo } }); + // generate user-defined functions + functions = new HashMap<>(); + mergedEnv.getFunctions().forEach((name, descriptor) -> { + DescriptorProperties properties = new DescriptorProperties(true); + descriptor.addProperties(properties); + functions.put( + name, + FunctionValidator.generateUserDefinedFunction(properties, classLoader)); --- End diff -- That's a good idea. > Add user-defined function support in SQL Client > --- > > Key: FLINK-8863 > URL: https://issues.apache.org/jira/browse/FLINK-8863 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > Labels: pull-request-available > > This issue is a subtask of part two "Full Embedded SQL Client" of the > implementation plan mentioned in > [FLIP-24|https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client]. > > It should be possible to declare user-defined functions in the SQL client. > For now, we limit the registration to classes that implement > {{ScalarFunction}}, {{TableFunction}}, {{AggregateFunction}}. Functions that > are implemented in SQL are not part of this issue. > I would suggest to introduce a {{functions}} top-level property. The > declaration could look similar to: > {code} > functions: > - name: testFunction > from: class <-- optional, default: class > class: org.my.MyScalarFunction > constructor: <-- optional, needed for certain types of functions > - 42.0 > - class: org.my.Class <-- possibility to create objects via properties > constructor: > - 1 > - true > - false > - "whatever" > - type: INT > value: 1 > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8863) Add user-defined function support in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-8863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518812#comment-16518812 ] ASF GitHub Bot commented on FLINK-8863: --- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/6090#discussion_r196995191 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/UDFDescriptor.java --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.config; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.client.SqlClientException; +import org.apache.flink.table.descriptors.ClassTypeDescriptor; +import org.apache.flink.table.descriptors.ClassTypeValidator; +import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.table.descriptors.FunctionDescriptor; +import org.apache.flink.table.descriptors.FunctionValidator; +import org.apache.flink.table.descriptors.PrimitiveTypeDescriptor; +import org.apache.flink.table.descriptors.PrimitiveTypeValidator; +import org.apache.flink.table.typeutils.TypeStringUtils; + +import java.util.List; +import java.util.Map; + +import static org.apache.flink.table.client.config.UDFDescriptor.From.CLASS; + +/** + * Descriptor for user-defined functions. + */ +public class UDFDescriptor extends FunctionDescriptor { --- End diff -- I'm not sure if there will be more descriptors (for other purposes) extend `FunctionDescriptor` in the future. How about renaming it to `UserDefinedFunction`? > Add user-defined function support in SQL Client > --- > > Key: FLINK-8863 > URL: https://issues.apache.org/jira/browse/FLINK-8863 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > Labels: pull-request-available > > This issue is a subtask of part two "Full Embedded SQL Client" of the > implementation plan mentioned in > [FLIP-24|https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client]. > > It should be possible to declare user-defined functions in the SQL client. > For now, we limit the registration to classes that implement > {{ScalarFunction}}, {{TableFunction}}, {{AggregateFunction}}. Functions that > are implemented in SQL are not part of this issue. > I would suggest to introduce a {{functions}} top-level property. The > declaration could look similar to: > {code} > functions: > - name: testFunction > from: class <-- optional, default: class > class: org.my.MyScalarFunction > constructor: <-- optional, needed for certain types of functions > - 42.0 > - class: org.my.Class <-- possibility to create objects via properties > constructor: > - 1 > - true > - false > - "whatever" > - type: INT > value: 1 > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/6090#discussion_r196995241 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java --- @@ -101,6 +110,16 @@ public ExecutionContext(Environment defaultEnvironment, SessionContext sessionCo } }); + // generate user-defined functions + functions = new HashMap<>(); + mergedEnv.getFunctions().forEach((name, descriptor) -> { + DescriptorProperties properties = new DescriptorProperties(true); + descriptor.addProperties(properties); + functions.put( + name, + FunctionValidator.generateUserDefinedFunction(properties, classLoader)); --- End diff -- That's a good idea. ---
[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/6090#discussion_r196995191 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/UDFDescriptor.java --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.config; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.client.SqlClientException; +import org.apache.flink.table.descriptors.ClassTypeDescriptor; +import org.apache.flink.table.descriptors.ClassTypeValidator; +import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.table.descriptors.FunctionDescriptor; +import org.apache.flink.table.descriptors.FunctionValidator; +import org.apache.flink.table.descriptors.PrimitiveTypeDescriptor; +import org.apache.flink.table.descriptors.PrimitiveTypeValidator; +import org.apache.flink.table.typeutils.TypeStringUtils; + +import java.util.List; +import java.util.Map; + +import static org.apache.flink.table.client.config.UDFDescriptor.From.CLASS; + +/** + * Descriptor for user-defined functions. + */ +public class UDFDescriptor extends FunctionDescriptor { --- End diff -- I'm not sure if there will be more descriptors (for other purposes) extend `FunctionDescriptor` in the future. How about renaming it to `UserDefinedFunction`? ---
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518810#comment-16518810 ] ASF GitHub Bot commented on FLINK-9514: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196995095 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlConfig.java --- @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.util.Preconditions; + +/** + * Configuration of state TTL logic. + * TODO: builder + */ +public class TtlConfig { + private final TtlUpdateType ttlUpdateType; + private final TtlStateVisibility stateVisibility; + private final TtlTimeCharacteristic timeCharacteristic; + private final Time ttl; + + public TtlConfig( + TtlUpdateType ttlUpdateType, + TtlStateVisibility stateVisibility, + TtlTimeCharacteristic timeCharacteristic, + Time ttl) { + Preconditions.checkNotNull(ttlUpdateType); + Preconditions.checkNotNull(stateVisibility); + Preconditions.checkNotNull(timeCharacteristic); --- End diff -- Maybe we should also check that the `ttl` is greater than 0? > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-9514: -- Labels: pull-request-available (was: ) > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196995095 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlConfig.java --- @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.util.Preconditions; + +/** + * Configuration of state TTL logic. + * TODO: builder + */ +public class TtlConfig { + private final TtlUpdateType ttlUpdateType; + private final TtlStateVisibility stateVisibility; + private final TtlTimeCharacteristic timeCharacteristic; + private final Time ttl; + + public TtlConfig( + TtlUpdateType ttlUpdateType, + TtlStateVisibility stateVisibility, + TtlTimeCharacteristic timeCharacteristic, + Time ttl) { + Preconditions.checkNotNull(ttlUpdateType); + Preconditions.checkNotNull(stateVisibility); + Preconditions.checkNotNull(timeCharacteristic); --- End diff -- Maybe we should also check that the `ttl` is greater than 0? ---
[jira] [Commented] (FLINK-8863) Add user-defined function support in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-8863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518808#comment-16518808 ] ASF GitHub Bot commented on FLINK-8863: --- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/6090#discussion_r196994767 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/UDFDescriptor.java --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.config; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.client.SqlClientException; +import org.apache.flink.table.descriptors.ClassTypeDescriptor; +import org.apache.flink.table.descriptors.ClassTypeValidator; +import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.table.descriptors.FunctionDescriptor; +import org.apache.flink.table.descriptors.FunctionValidator; +import org.apache.flink.table.descriptors.PrimitiveTypeDescriptor; +import org.apache.flink.table.descriptors.PrimitiveTypeValidator; +import org.apache.flink.table.typeutils.TypeStringUtils; + +import java.util.List; +import java.util.Map; + +import static org.apache.flink.table.client.config.UDFDescriptor.From.CLASS; + +/** + * Descriptor for user-defined functions. + */ +public class UDFDescriptor extends FunctionDescriptor { + + private static final String FROM = "from"; + + private From from; + + private UDFDescriptor(String name, From from) { + super(name); + this.from = from; + } + + public From getFrom() { + return from; + } + + /** +* Create a UDF descriptor with the given config. +*/ + public static UDFDescriptor create(Map config) { + if (!config.containsKey(FunctionValidator.FUNCTION_NAME())) { + throw new SqlClientException("The 'name' attribute of a function is missing."); + } + + final Object name = config.get(FunctionValidator.FUNCTION_NAME()); + if (!(name instanceof String) || ((String) name).length() <= 0) { + throw new SqlClientException("Invalid function name '" + name + "'."); + } --- End diff -- At first, I did use `ConfigUtil.normalizeYaml` for that. But I found the type information for some parameters was lost in this process, e.g., we could not tell a `false` is a boolean or a string. > Add user-defined function support in SQL Client > --- > > Key: FLINK-8863 > URL: https://issues.apache.org/jira/browse/FLINK-8863 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > Labels: pull-request-available > > This issue is a subtask of part two "Full Embedded SQL Client" of the > implementation plan mentioned in > [FLIP-24|https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client]. > > It should be possible to declare user-defined functions in the SQL client. > For now, we limit the registration to classes that implement > {{ScalarFunction}}, {{TableFunction}}, {{AggregateFunction}}. Functions that > are implemented in SQL are not part of this issue. > I would suggest to introduce a {{functions}} top-level property. The > declaration could look similar to: > {code} > functions: > - name: testFunction > from: class <-- optional, default: class > class: org.my.MyScalarFunction > constructor: <-- optional, needed for certain types of functions > - 42.0 > - class: org.my.Class <-- possibility to create objects via p
[jira] [Updated] (FLINK-8863) Add user-defined function support in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-8863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-8863: -- Labels: pull-request-available (was: ) > Add user-defined function support in SQL Client > --- > > Key: FLINK-8863 > URL: https://issues.apache.org/jira/browse/FLINK-8863 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > Labels: pull-request-available > > This issue is a subtask of part two "Full Embedded SQL Client" of the > implementation plan mentioned in > [FLIP-24|https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client]. > > It should be possible to declare user-defined functions in the SQL client. > For now, we limit the registration to classes that implement > {{ScalarFunction}}, {{TableFunction}}, {{AggregateFunction}}. Functions that > are implemented in SQL are not part of this issue. > I would suggest to introduce a {{functions}} top-level property. The > declaration could look similar to: > {code} > functions: > - name: testFunction > from: class <-- optional, default: class > class: org.my.MyScalarFunction > constructor: <-- optional, needed for certain types of functions > - 42.0 > - class: org.my.Class <-- possibility to create objects via properties > constructor: > - 1 > - true > - false > - "whatever" > - type: INT > value: 1 > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/6090#discussion_r196994767 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/UDFDescriptor.java --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.config; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.client.SqlClientException; +import org.apache.flink.table.descriptors.ClassTypeDescriptor; +import org.apache.flink.table.descriptors.ClassTypeValidator; +import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.table.descriptors.FunctionDescriptor; +import org.apache.flink.table.descriptors.FunctionValidator; +import org.apache.flink.table.descriptors.PrimitiveTypeDescriptor; +import org.apache.flink.table.descriptors.PrimitiveTypeValidator; +import org.apache.flink.table.typeutils.TypeStringUtils; + +import java.util.List; +import java.util.Map; + +import static org.apache.flink.table.client.config.UDFDescriptor.From.CLASS; + +/** + * Descriptor for user-defined functions. + */ +public class UDFDescriptor extends FunctionDescriptor { + + private static final String FROM = "from"; + + private From from; + + private UDFDescriptor(String name, From from) { + super(name); + this.from = from; + } + + public From getFrom() { + return from; + } + + /** +* Create a UDF descriptor with the given config. +*/ + public static UDFDescriptor create(Map config) { + if (!config.containsKey(FunctionValidator.FUNCTION_NAME())) { + throw new SqlClientException("The 'name' attribute of a function is missing."); + } + + final Object name = config.get(FunctionValidator.FUNCTION_NAME()); + if (!(name instanceof String) || ((String) name).length() <= 0) { + throw new SqlClientException("Invalid function name '" + name + "'."); + } --- End diff -- At first, I did use `ConfigUtil.normalizeYaml` for that. But I found the type information for some parameters was lost in this process, e.g., we could not tell a `false` is a boolean or a string. ---
[jira] [Updated] (FLINK-9630) Kafka09PartitionDiscoverer cause connection leak on TopicAuthorizationException
[ https://issues.apache.org/jira/browse/FLINK-9630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Youjun Yuan updated FLINK-9630: --- Description: when the Kafka topic got deleted, during task starting process, Kafka09PartitionDiscoverer will get a *TopicAuthorizationException* in getAllPartitionsForTopics(), and it get no chance to close the kafkaConsumer, hence resulting TCP connection leak (to Kafka broker). *this issue can bring down the whole Flink cluster*, because, in a default setup (fixedDelay with INT.MAX restart attempt), job manager will randomly schedule the job to any TaskManager that as free slot, and each attemp will cause the TaskManager to leak a TCP connection, eventually almost every TaskManager will run out of file handle, hence no taskmanger could make snaptshot, or accept new job. Effectly stops the whole cluster. The leak happens when StreamTask.invoke() calls openAllOperators(), then FlinkKafkaConsumerBase.open() calls partitionDiscoverer.discoverPartitions(), when kafkaConsumer.partitionsFor(topic) in KafkaPartitionDiscoverer.getAllPartitionsForTopics() hit a *TopicAuthorizationException,* no one catches this. Though StreamTask.open catches Exception and invoks the dispose() method of each operator, which eventaully invoke FlinkKakfaConsumerBase.cancel(), however it does not close the kakfaConsumer in partitionDiscoverer, not event invoke the partitionDiscoverer.wakeup(), because the discoveryLoopThread was null. below the code of FlinkKakfaConsumerBase.cancel() for your convenience public void cancel() { // set ourselves as not running; // this would let the main discovery loop escape as soon as possible running = false; if (discoveryLoopThread != null) { if (partitionDiscoverer != null) { // we cannot close the discoverer here, as it is error-prone to concurrent access; // only wakeup the discoverer, the discovery loop will clean itself up after it escapes partitionDiscoverer.wakeup(); } // the discovery loop may currently be sleeping in-between // consecutive discoveries; interrupt to shutdown faster discoveryLoopThread.interrupt(); } // abort the fetcher, if there is one if (kafkaFetcher != null) { kafkaFetcher.cancel(); } } was: when the Kafka topic got deleted, during task starting process, Kafka09PartitionDiscoverer will get a *TopicAuthorizationException* in getAllPartitionsForTopics(), and it get no chance to close the kafkaConsumer, hence resulting TCP connection leak (to Kafka broker). *this issue can bring down the whole Flink cluster*, because, in a default setup (fixedDelay with INT.MAX restart attempt), job manager will randomly schedule the job to any TaskManager that as free slot, and each attemp will cause the TaskManager to leak a TCP connection, eventually almost every TaskManager will run out of file handle, hence no taskmanger could make snaptshot, or accept new job. Effectly stops the whole cluster. The leak happens when StreamTask.invoke() calls openAllOperators(), then FlinkKafkaConsumerBase.open() calls partitionDiscoverer.discoverPartitions(), when kafkaConsumer.partitionsFor(topic) in KafkaPartitionDiscoverer.getAllPartitionsForTopics() hit a *TopicAuthorizationException,* no one catches this. Though StreamTask.open catches Exception and invoks the dispose() method of each operator, which eventaully invoke FlinkKakfaConsumerBase.cancel(), however it does not close the kakfaConsumer in partitionDiscoverer, not event invoke the partitionDiscoverer.wakeup(), because the discoveryLoopThread was null. below the code of FlinkKakfaConsumerBase.cancel() for your convenience public void cancel() { // set ourselves as not running; // this would let the main discovery loop escape as soon as possible running = false; if (discoveryLoopThread != null) { if (partitionDiscoverer != null) { // we cannot close the discoverer here, as it is error-prone to concurrent access; // only wakeup the discoverer, the discovery loop will clean itself up after it escapes partitionDiscoverer.wakeup(); } // the discovery loop may currently be sleeping in-between // consecutive discoveries; interrupt to shutdown faster discoveryLoopThread.interrupt(); } // abort the fetcher, if there is one if (kafkaFetcher != null) { kafkaFetcher.cancel(); } } I tried to fix it by catching *TopicAuthorizationException* in ** Kafka09PartitionDiscoverer.getAllPartitionsForTopics(), and close the kafkaConsumer. Which has been verified working. So I'd like to take this issue. > Kafka09PartitionDiscoverer cause connection leak on > TopicAuthorizationException > ---
[jira] [Created] (FLINK-9630) Kafka09PartitionDiscoverer cause connection leak on TopicAuthorizationException
Youjun Yuan created FLINK-9630: -- Summary: Kafka09PartitionDiscoverer cause connection leak on TopicAuthorizationException Key: FLINK-9630 URL: https://issues.apache.org/jira/browse/FLINK-9630 Project: Flink Issue Type: Bug Components: Kafka Connector Affects Versions: 1.4.2, 1.5.0 Environment: Linux 2.6, java 8, Kafka broker 0.10.x Reporter: Youjun Yuan Fix For: 1.5.1 when the Kafka topic got deleted, during task starting process, Kafka09PartitionDiscoverer will get a *TopicAuthorizationException* in getAllPartitionsForTopics(), and it get no chance to close the kafkaConsumer, hence resulting TCP connection leak (to Kafka broker). *this issue can bring down the whole Flink cluster*, because, in a default setup (fixedDelay with INT.MAX restart attempt), job manager will randomly schedule the job to any TaskManager that as free slot, and each attemp will cause the TaskManager to leak a TCP connection, eventually almost every TaskManager will run out of file handle, hence no taskmanger could make snaptshot, or accept new job. Effectly stops the whole cluster. The leak happens when StreamTask.invoke() calls openAllOperators(), then FlinkKafkaConsumerBase.open() calls partitionDiscoverer.discoverPartitions(), when kafkaConsumer.partitionsFor(topic) in KafkaPartitionDiscoverer.getAllPartitionsForTopics() hit a *TopicAuthorizationException,* no one catches this. Though StreamTask.open catches Exception and invoks the dispose() method of each operator, which eventaully invoke FlinkKakfaConsumerBase.cancel(), however it does not close the kakfaConsumer in partitionDiscoverer, not event invoke the partitionDiscoverer.wakeup(), because the discoveryLoopThread was null. below the code of FlinkKakfaConsumerBase.cancel() for your convenience public void cancel() { // set ourselves as not running; // this would let the main discovery loop escape as soon as possible running = false; if (discoveryLoopThread != null) { if (partitionDiscoverer != null) { // we cannot close the discoverer here, as it is error-prone to concurrent access; // only wakeup the discoverer, the discovery loop will clean itself up after it escapes partitionDiscoverer.wakeup(); } // the discovery loop may currently be sleeping in-between // consecutive discoveries; interrupt to shutdown faster discoveryLoopThread.interrupt(); } // abort the fetcher, if there is one if (kafkaFetcher != null) { kafkaFetcher.cancel(); } } -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9629) Datadog metrics reporter does not have shaded dependencies
[ https://issues.apache.org/jira/browse/FLINK-9629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-9629: -- Labels: pull-request-available (was: ) > Datadog metrics reporter does not have shaded dependencies > -- > > Key: FLINK-9629 > URL: https://issues.apache.org/jira/browse/FLINK-9629 > Project: Flink > Issue Type: Bug > Components: Metrics >Affects Versions: 1.5.0, 1.6.0, 1.5.1 >Reporter: Georgii Gobozov >Priority: Major > Labels: pull-request-available > > flink-metrics-datadog-1.5.0.jar does not contain shaded dependencies for > okhttp3 and okio -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9629) Datadog metrics reporter does not have shaded dependencies
[ https://issues.apache.org/jira/browse/FLINK-9629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518747#comment-16518747 ] ASF GitHub Bot commented on FLINK-9629: --- GitHub user gobozov opened a pull request: https://github.com/apache/flink/pull/6191 [FLINK-9629][metrics] Included datadog shaded dependencies for okhttp3 and okio You can merge this pull request into a Git repository by running: $ git pull https://github.com/gobozov/flink release-1.5 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6191.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6191 commit b0904ffc31d4a9272bf852766f15210387504420 Author: Georgii.Gobozov Date: 2018-06-21T00:17:52Z FLINK-9629: included shaded dependencies for okhttp3 and okio > Datadog metrics reporter does not have shaded dependencies > -- > > Key: FLINK-9629 > URL: https://issues.apache.org/jira/browse/FLINK-9629 > Project: Flink > Issue Type: Bug > Components: Metrics >Affects Versions: 1.5.0, 1.6.0, 1.5.1 >Reporter: Georgii Gobozov >Priority: Major > Labels: pull-request-available > > flink-metrics-datadog-1.5.0.jar does not contain shaded dependencies for > okhttp3 and okio -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6191: [FLINK-9629][metrics] Included datadog shaded depe...
GitHub user gobozov opened a pull request: https://github.com/apache/flink/pull/6191 [FLINK-9629][metrics] Included datadog shaded dependencies for okhttp3 and okio You can merge this pull request into a Git repository by running: $ git pull https://github.com/gobozov/flink release-1.5 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6191.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6191 commit b0904ffc31d4a9272bf852766f15210387504420 Author: Georgii.Gobozov Date: 2018-06-21T00:17:52Z FLINK-9629: included shaded dependencies for okhttp3 and okio ---
[jira] [Created] (FLINK-9629) Datadog metrics reporter does not have shaded dependencies
Georgii Gobozov created FLINK-9629: -- Summary: Datadog metrics reporter does not have shaded dependencies Key: FLINK-9629 URL: https://issues.apache.org/jira/browse/FLINK-9629 Project: Flink Issue Type: Bug Components: Metrics Affects Versions: 1.5.0, 1.6.0, 1.5.1 Reporter: Georgii Gobozov flink-metrics-datadog-1.5.0.jar does not contain shaded dependencies for okhttp3 and okio -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-9627) Extending 'KafkaJsonTableSource' according to comments will result in NPE
[ https://issues.apache.org/jira/browse/FLINK-9627?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-9627: --- Assignee: vinoyang > Extending 'KafkaJsonTableSource' according to comments will result in NPE > - > > Key: FLINK-9627 > URL: https://issues.apache.org/jira/browse/FLINK-9627 > Project: Flink > Issue Type: Bug >Affects Versions: 1.5.0 >Reporter: Dominik Wosiński >Assignee: vinoyang >Priority: Major > > According to the comments what is needed to extend the 'KafkaJsonTableSource' > looks as follows: > > {code:java} > A version-agnostic Kafka JSON {@link StreamTableSource}. > * > * The version-specific Kafka consumers need to extend this class and > * override {@link #createKafkaConsumer(String, Properties, > DeserializationSchema)}}. > * > * The field names are used to parse the JSON file and so are the > types.{code} > This will cause an NPE, since there is no default value for startupMode in > the abstract class itself only in the builder of this class. > For the 'getKafkaConsumer' method the switch statement will be executed on > non-initialized 'startupMode' field: > {code:java} > switch (startupMode) { > case EARLIEST: > kafkaConsumer.setStartFromEarliest(); > break; > case LATEST: > kafkaConsumer.setStartFromLatest(); > break; > case GROUP_OFFSETS: > kafkaConsumer.setStartFromGroupOffsets(); > break; > case SPECIFIC_OFFSETS: > kafkaConsumer.setStartFromSpecificOffsets(specificStartupOffsets); > break; > }{code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6190: Add a simple pulsar source connector.
Github user cckellogg closed the pull request at: https://github.com/apache/flink/pull/6190 ---
[GitHub] flink pull request #6190: Add a simple pulsar source connector.
GitHub user cckellogg opened a pull request: https://github.com/apache/flink/pull/6190 Add a simple pulsar source connector. *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.* *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.* ## Contribution Checklist - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue. - Name the pull request in the form "[FLINK-] [component] Title of the pull request", where *FLINK-* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component. Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`. - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review. - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Travis CI to do that following [this guide](http://flink.apache.org/contribute-code.html#best-practices). - Each pull request should address only one issue, not mix up code from multiple issues. - Each commit in the pull request has a meaningful commit message (including the JIRA id) - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below. **(The sections below can be removed for hotfixes of typos)** ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/cckellogg/flink flink-pulsar-source-connector Alternatively you can review and apply these changes as the patch at: https://github.
[jira] [Commented] (FLINK-9374) Flink Kinesis Producer does not backpressure
[ https://issues.apache.org/jira/browse/FLINK-9374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518631#comment-16518631 ] ASF GitHub Bot commented on FLINK-9374: --- Github user gliu6 commented on a diff in the pull request: https://github.com/apache/flink/pull/6021#discussion_r196952063 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java --- @@ -326,6 +366,29 @@ private void checkAndPropagateAsyncError() throws Exception { } } + /** +* If the internal queue of the {@link KinesisProducer} gets too long, +* flush some of the records until we are below the limit again. +* We don't want to flush _all_ records at this point since that would +* break record aggregation. +*/ + private void enforceQueueLimit() { --- End diff -- Btw, I am not requesting any changes, pr looks good to me for it defined purpose. Just wonder how to config easily. Now I think about this a bit more. Will it be better if we expose `queue size` to the user instead of `queue limit (number)`, thus, Inside of the FKP class, define an integer recordSize, and inside of the invoke function, do a moving average calculation of the recordSize with `serialized.remaining()` dynamically. > Flink Kinesis Producer does not backpressure > > > Key: FLINK-9374 > URL: https://issues.apache.org/jira/browse/FLINK-9374 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Reporter: Franz Thoma >Priority: Critical > Labels: pull-request-available > Attachments: after.png, before.png > > > The {{FlinkKinesisProducer}} just accepts records and forwards it to a > {{KinesisProducer}} from the Amazon Kinesis Producer Library (KPL). The KPL > internally holds an unbounded queue of records that have not yet been sent. > Since Kinesis is rate-limited to 1MB per second per shard, this queue may > grow indefinitely if Flink sends records faster than the KPL can forward them > to Kinesis. > One way to circumvent this problem is to set a record TTL, so that queued > records are dropped after a certain amount of time, but this will lead to > data loss under high loads. > Currently the only time the queue is flushed is during checkpointing: > {{FlinkKinesisProducer}} consumes records at arbitrary rate, either until a > checkpoint is reached (and will wait until the queue is flushed), or until > out-of-memory, whichever is reached first. (This gets worse due to the fact > that the Java KPL is only a thin wrapper around a C++ process, so it is not > even the Java process that runs out of memory, but the C++ process.) The > implicit rate-limit due to checkpointing leads to a ragged throughput graph > like this (the periods with zero throughput are the wait times before a > checkpoint): > !file:///home/fthoma/projects/flink/before.png!!before.png! Throughput > limited by checkpointing only > My proposed solution is to add a config option {{queueLimit}} to set a > maximum number of records that may be waiting in the KPL queue. If this limit > is reached, the {{FlinkKinesisProducer}} should trigger a {{flush()}} and > wait (blocking) until the queue length is below the limit again. This > automatically leads to backpressuring, since the {{FlinkKinesisProducer}} > cannot accept records while waiting. For compatibility, {{queueLimit}} is set > to {{Integer.MAX_VALUE}} by default, so the behavior is unchanged unless a > client explicitly sets the value. Setting a »sane« default value is not > possible unfortunately, since sensible values for the limit depend on the > record size (the limit should be chosen so that about 10–100MB of records per > shard are accumulated before flushing, otherwise the maximum Kinesis > throughput may not be reached). > !after.png! Throughput with a queue limit of 10 records (the spikes are > checkpoints, where the queue is still flushed completely) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...
Github user gliu6 commented on a diff in the pull request: https://github.com/apache/flink/pull/6021#discussion_r196952063 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java --- @@ -326,6 +366,29 @@ private void checkAndPropagateAsyncError() throws Exception { } } + /** +* If the internal queue of the {@link KinesisProducer} gets too long, +* flush some of the records until we are below the limit again. +* We don't want to flush _all_ records at this point since that would +* break record aggregation. +*/ + private void enforceQueueLimit() { --- End diff -- Btw, I am not requesting any changes, pr looks good to me for it defined purpose. Just wonder how to config easily. Now I think about this a bit more. Will it be better if we expose `queue size` to the user instead of `queue limit (number)`, thus, Inside of the FKP class, define an integer recordSize, and inside of the invoke function, do a moving average calculation of the recordSize with `serialized.remaining()` dynamically. ---
[jira] [Commented] (FLINK-9374) Flink Kinesis Producer does not backpressure
[ https://issues.apache.org/jira/browse/FLINK-9374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518590#comment-16518590 ] ASF GitHub Bot commented on FLINK-9374: --- Github user gliu6 commented on a diff in the pull request: https://github.com/apache/flink/pull/6021#discussion_r196940135 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java --- @@ -326,6 +366,29 @@ private void checkAndPropagateAsyncError() throws Exception { } } + /** +* If the internal queue of the {@link KinesisProducer} gets too long, +* flush some of the records until we are below the limit again. +* We don't want to flush _all_ records at this point since that would +* break record aggregation. +*/ + private void enforceQueueLimit() { --- End diff -- I wonder whether we could adjust the queue limit dynamically. you mentioned that `queue limit = (number of shards * queue size per shard) / record size`. except record size, all others are relatively easy to set. For me, I don't really know the record size until the application starts. Also, what is the record size varies over time? So how about add a queueLimit supplier function here to allow user to supply how the queueLimit is calculated dynamically? > Flink Kinesis Producer does not backpressure > > > Key: FLINK-9374 > URL: https://issues.apache.org/jira/browse/FLINK-9374 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Reporter: Franz Thoma >Priority: Critical > Labels: pull-request-available > Attachments: after.png, before.png > > > The {{FlinkKinesisProducer}} just accepts records and forwards it to a > {{KinesisProducer}} from the Amazon Kinesis Producer Library (KPL). The KPL > internally holds an unbounded queue of records that have not yet been sent. > Since Kinesis is rate-limited to 1MB per second per shard, this queue may > grow indefinitely if Flink sends records faster than the KPL can forward them > to Kinesis. > One way to circumvent this problem is to set a record TTL, so that queued > records are dropped after a certain amount of time, but this will lead to > data loss under high loads. > Currently the only time the queue is flushed is during checkpointing: > {{FlinkKinesisProducer}} consumes records at arbitrary rate, either until a > checkpoint is reached (and will wait until the queue is flushed), or until > out-of-memory, whichever is reached first. (This gets worse due to the fact > that the Java KPL is only a thin wrapper around a C++ process, so it is not > even the Java process that runs out of memory, but the C++ process.) The > implicit rate-limit due to checkpointing leads to a ragged throughput graph > like this (the periods with zero throughput are the wait times before a > checkpoint): > !file:///home/fthoma/projects/flink/before.png!!before.png! Throughput > limited by checkpointing only > My proposed solution is to add a config option {{queueLimit}} to set a > maximum number of records that may be waiting in the KPL queue. If this limit > is reached, the {{FlinkKinesisProducer}} should trigger a {{flush()}} and > wait (blocking) until the queue length is below the limit again. This > automatically leads to backpressuring, since the {{FlinkKinesisProducer}} > cannot accept records while waiting. For compatibility, {{queueLimit}} is set > to {{Integer.MAX_VALUE}} by default, so the behavior is unchanged unless a > client explicitly sets the value. Setting a »sane« default value is not > possible unfortunately, since sensible values for the limit depend on the > record size (the limit should be chosen so that about 10–100MB of records per > shard are accumulated before flushing, otherwise the maximum Kinesis > throughput may not be reached). > !after.png! Throughput with a queue limit of 10 records (the spikes are > checkpoints, where the queue is still flushed completely) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9374) Flink Kinesis Producer does not backpressure
[ https://issues.apache.org/jira/browse/FLINK-9374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-9374: -- Labels: pull-request-available (was: ) > Flink Kinesis Producer does not backpressure > > > Key: FLINK-9374 > URL: https://issues.apache.org/jira/browse/FLINK-9374 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Reporter: Franz Thoma >Priority: Critical > Labels: pull-request-available > Attachments: after.png, before.png > > > The {{FlinkKinesisProducer}} just accepts records and forwards it to a > {{KinesisProducer}} from the Amazon Kinesis Producer Library (KPL). The KPL > internally holds an unbounded queue of records that have not yet been sent. > Since Kinesis is rate-limited to 1MB per second per shard, this queue may > grow indefinitely if Flink sends records faster than the KPL can forward them > to Kinesis. > One way to circumvent this problem is to set a record TTL, so that queued > records are dropped after a certain amount of time, but this will lead to > data loss under high loads. > Currently the only time the queue is flushed is during checkpointing: > {{FlinkKinesisProducer}} consumes records at arbitrary rate, either until a > checkpoint is reached (and will wait until the queue is flushed), or until > out-of-memory, whichever is reached first. (This gets worse due to the fact > that the Java KPL is only a thin wrapper around a C++ process, so it is not > even the Java process that runs out of memory, but the C++ process.) The > implicit rate-limit due to checkpointing leads to a ragged throughput graph > like this (the periods with zero throughput are the wait times before a > checkpoint): > !file:///home/fthoma/projects/flink/before.png!!before.png! Throughput > limited by checkpointing only > My proposed solution is to add a config option {{queueLimit}} to set a > maximum number of records that may be waiting in the KPL queue. If this limit > is reached, the {{FlinkKinesisProducer}} should trigger a {{flush()}} and > wait (blocking) until the queue length is below the limit again. This > automatically leads to backpressuring, since the {{FlinkKinesisProducer}} > cannot accept records while waiting. For compatibility, {{queueLimit}} is set > to {{Integer.MAX_VALUE}} by default, so the behavior is unchanged unless a > client explicitly sets the value. Setting a »sane« default value is not > possible unfortunately, since sensible values for the limit depend on the > record size (the limit should be chosen so that about 10–100MB of records per > shard are accumulated before flushing, otherwise the maximum Kinesis > throughput may not be reached). > !after.png! Throughput with a queue limit of 10 records (the spikes are > checkpoints, where the queue is still flushed completely) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...
Github user gliu6 commented on a diff in the pull request: https://github.com/apache/flink/pull/6021#discussion_r196940135 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java --- @@ -326,6 +366,29 @@ private void checkAndPropagateAsyncError() throws Exception { } } + /** +* If the internal queue of the {@link KinesisProducer} gets too long, +* flush some of the records until we are below the limit again. +* We don't want to flush _all_ records at this point since that would +* break record aggregation. +*/ + private void enforceQueueLimit() { --- End diff -- I wonder whether we could adjust the queue limit dynamically. you mentioned that `queue limit = (number of shards * queue size per shard) / record size`. except record size, all others are relatively easy to set. For me, I don't really know the record size until the application starts. Also, what is the record size varies over time? So how about add a queueLimit supplier function here to allow user to supply how the queueLimit is calculated dynamically? ---
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518360#comment-16518360 ] ASF GitHub Bot commented on FLINK-9599: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196868266 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java --- @@ -0,0 +1,483 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.WebOptions; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.EmptyResponseBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.RequestBody; +import org.apache.flink.runtime.rest.util.RestMapperUtils; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.TestingRestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import okhttp3.MediaType; +import okhttp3.MultipartBody; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import javax.annotation.Nonnull; + +import java.io.File; +import java.io.IOException; +import java.io.StringWriter; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CompletableFuture; + +import static java.util.Objects.requireNonNull; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +/** + * Tests for the {@link FileUploadHandler}. Ensures that multipart http messages containing files and/or json are properly + * handled. + */ +public class FileUploadHandlerTest extends TestLogger { + + private static final ObjectMapper OBJECT_MAPPER = RestMapperUtils.getStrictObjectMapper(); + private static final Random RANDOM = new Random(); + + @ClassRule + public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + private static RestServerEndpoint serverEndpoint; + private static String serverAddress; + + private static MultipartMixedHandler mixedHandler; + private static MultipartJsonHandler jsonHandler; + private static MultipartFileHandler fileHandler; + private static File file1; + private static File file2; +
[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196868266 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java --- @@ -0,0 +1,483 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.WebOptions; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.EmptyResponseBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.RequestBody; +import org.apache.flink.runtime.rest.util.RestMapperUtils; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.TestingRestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import okhttp3.MediaType; +import okhttp3.MultipartBody; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import javax.annotation.Nonnull; + +import java.io.File; +import java.io.IOException; +import java.io.StringWriter; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CompletableFuture; + +import static java.util.Objects.requireNonNull; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +/** + * Tests for the {@link FileUploadHandler}. Ensures that multipart http messages containing files and/or json are properly + * handled. + */ +public class FileUploadHandlerTest extends TestLogger { + + private static final ObjectMapper OBJECT_MAPPER = RestMapperUtils.getStrictObjectMapper(); + private static final Random RANDOM = new Random(); + + @ClassRule + public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + private static RestServerEndpoint serverEndpoint; + private static String serverAddress; + + private static MultipartMixedHandler mixedHandler; + private static MultipartJsonHandler jsonHandler; + private static MultipartFileHandler fileHandler; + private static File file1; + private static File file2; + + @BeforeClass + public static void setup() throws Exception { + Configuration config = new Configuration(); + config.setInteger(RestOptions.PORT, 0); + config.setString(RestOptions.ADDRESS, "localhost");
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518333#comment-16518333 ] ASF GitHub Bot commented on FLINK-9599: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196849558 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java --- @@ -0,0 +1,483 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.WebOptions; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.EmptyResponseBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.RequestBody; +import org.apache.flink.runtime.rest.util.RestMapperUtils; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.TestingRestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import okhttp3.MediaType; +import okhttp3.MultipartBody; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import javax.annotation.Nonnull; + +import java.io.File; +import java.io.IOException; +import java.io.StringWriter; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CompletableFuture; + +import static java.util.Objects.requireNonNull; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +/** + * Tests for the {@link FileUploadHandler}. Ensures that multipart http messages containing files and/or json are properly + * handled. + */ +public class FileUploadHandlerTest extends TestLogger { + + private static final ObjectMapper OBJECT_MAPPER = RestMapperUtils.getStrictObjectMapper(); + private static final Random RANDOM = new Random(); + + @ClassRule + public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + private static RestServerEndpoint serverEndpoint; + private static String serverAddress; + + private static MultipartMixedHandler mixedHandler; + private static MultipartJsonHandler jsonHandler; + private static MultipartFileHandler fileHandler; + private static File file1; + private static File file2; +
[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196849558 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java --- @@ -0,0 +1,483 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.WebOptions; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.EmptyResponseBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.RequestBody; +import org.apache.flink.runtime.rest.util.RestMapperUtils; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.TestingRestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import okhttp3.MediaType; +import okhttp3.MultipartBody; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import javax.annotation.Nonnull; + +import java.io.File; +import java.io.IOException; +import java.io.StringWriter; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CompletableFuture; + +import static java.util.Objects.requireNonNull; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +/** + * Tests for the {@link FileUploadHandler}. Ensures that multipart http messages containing files and/or json are properly + * handled. + */ +public class FileUploadHandlerTest extends TestLogger { + + private static final ObjectMapper OBJECT_MAPPER = RestMapperUtils.getStrictObjectMapper(); + private static final Random RANDOM = new Random(); + + @ClassRule + public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + private static RestServerEndpoint serverEndpoint; + private static String serverAddress; + + private static MultipartMixedHandler mixedHandler; + private static MultipartJsonHandler jsonHandler; + private static MultipartFileHandler fileHandler; + private static File file1; + private static File file2; + + @BeforeClass + public static void setup() throws Exception { + Configuration config = new Configuration(); + config.setInteger(RestOptions.PORT, 0); + config.setString(RestOptions.ADDRESS, "localhost");
[jira] [Commented] (FLINK-8795) Scala shell broken for Flip6
[ https://issues.apache.org/jira/browse/FLINK-8795?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518316#comment-16518316 ] ASF GitHub Bot commented on FLINK-8795: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/6182 Thanks for the fix @dawidwys! The changes LGTM on my side, +1. > Scala shell broken for Flip6 > > > Key: FLINK-8795 > URL: https://issues.apache.org/jira/browse/FLINK-8795 > Project: Flink > Issue Type: Bug >Reporter: kant kodali >Assignee: Dawid Wysakowicz >Priority: Blocker > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > I am trying to run the simple code below after building everything from > Flink's github master branch for various reasons. I get an exception below > and I wonder what runs on port 9065? and How to fix this exception? > I followed the instructions from the Flink master branch so I did the > following. > {code:java} > git clone https://github.com/apache/flink.git > cd flink mvn clean package -DskipTests > cd build-target > ./bin/start-scala-shell.sh local{code} > {{And Here is the code I ran}} > {code:java} > val dataStream = senv.fromElements(1, 2, 3, 4) > dataStream.countWindowAll(2).sum(0).print() > senv.execute("My streaming program"){code} > {{And I finally get this exception}} > {code:java} > Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to > submit JobGraph. at > org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$18(RestClusterClient.java:306) > at > java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) > at > java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > at > org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$222(RestClient.java:196) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:268) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:284) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137) > at java.lang.Thread.run(Thread.java:745) Caused by: > java.util.concurrent.CompletionException: java.net.ConnectException: > Connection refused: localhost/127.0.0.1:9065 at > java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) > at > java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) > at > java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943) > at > java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) > ... 16 more Caused by: java.net.ConnectException: Connection refused: > localhost/127.0.0.1:9065 at sun.nio.ch.SocketChannelImpl.checkConnect(Native > Method) at > sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) at > org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:281){code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8795) Scala shell broken for Flip6
[ https://issues.apache.org/jira/browse/FLINK-8795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-8795: -- Labels: pull-request-available (was: ) > Scala shell broken for Flip6 > > > Key: FLINK-8795 > URL: https://issues.apache.org/jira/browse/FLINK-8795 > Project: Flink > Issue Type: Bug >Reporter: kant kodali >Assignee: Dawid Wysakowicz >Priority: Blocker > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > I am trying to run the simple code below after building everything from > Flink's github master branch for various reasons. I get an exception below > and I wonder what runs on port 9065? and How to fix this exception? > I followed the instructions from the Flink master branch so I did the > following. > {code:java} > git clone https://github.com/apache/flink.git > cd flink mvn clean package -DskipTests > cd build-target > ./bin/start-scala-shell.sh local{code} > {{And Here is the code I ran}} > {code:java} > val dataStream = senv.fromElements(1, 2, 3, 4) > dataStream.countWindowAll(2).sum(0).print() > senv.execute("My streaming program"){code} > {{And I finally get this exception}} > {code:java} > Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to > submit JobGraph. at > org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$18(RestClusterClient.java:306) > at > java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) > at > java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > at > org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$222(RestClient.java:196) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:268) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:284) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137) > at java.lang.Thread.run(Thread.java:745) Caused by: > java.util.concurrent.CompletionException: java.net.ConnectException: > Connection refused: localhost/127.0.0.1:9065 at > java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) > at > java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) > at > java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943) > at > java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) > ... 16 more Caused by: java.net.ConnectException: Connection refused: > localhost/127.0.0.1:9065 at sun.nio.ch.SocketChannelImpl.checkConnect(Native > Method) at > sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) at > org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:281){code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6182: [FLINK-8795] Fixed local scala shell for Flip6
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/6182 Thanks for the fix @dawidwys! The changes LGTM on my side, +1. ---
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518311#comment-16518311 ] ASF GitHub Bot commented on FLINK-9599: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196836698 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler; + +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.Preconditions; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; + +/** + * A container for uploaded files. + * + * Implementation note: The constructor also accepts directories to ensure that the upload directories are cleaned up. + * For convenience during testing it also accepts files directly. + */ +public final class FileUploads implements AutoCloseable { + private final Collection directoriesToClean; + private final Collection uploadedFiles; + + @SuppressWarnings("resource") + public static final FileUploads EMPTY = new FileUploads(); + + private FileUploads() { + this.directoriesToClean = Collections.emptyList(); + this.uploadedFiles = Collections.emptyList(); + } + + public FileUploads(Collection uploadedFilesOrDirectory) throws IOException { + final Collection files = new ArrayList<>(4); + final Collection directories = new ArrayList<>(1); + for (Path fileOrDirectory : uploadedFilesOrDirectory) { + Preconditions.checkArgument(fileOrDirectory.isAbsolute(), "Path must be absolute."); + if (Files.isDirectory(fileOrDirectory)) { + directories.add(fileOrDirectory); + FileAdderVisitor visitor = new FileAdderVisitor(); + Files.walkFileTree(fileOrDirectory, visitor); + files.addAll(visitor.getContainedFiles()); + } else { + files.add(fileOrDirectory); + } --- End diff -- we don't have to, it's for testing convenience as noted in the class javadocs. > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196836698 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler; + +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.Preconditions; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; + +/** + * A container for uploaded files. + * + * Implementation note: The constructor also accepts directories to ensure that the upload directories are cleaned up. + * For convenience during testing it also accepts files directly. + */ +public final class FileUploads implements AutoCloseable { + private final Collection directoriesToClean; + private final Collection uploadedFiles; + + @SuppressWarnings("resource") + public static final FileUploads EMPTY = new FileUploads(); + + private FileUploads() { + this.directoriesToClean = Collections.emptyList(); + this.uploadedFiles = Collections.emptyList(); + } + + public FileUploads(Collection uploadedFilesOrDirectory) throws IOException { + final Collection files = new ArrayList<>(4); + final Collection directories = new ArrayList<>(1); + for (Path fileOrDirectory : uploadedFilesOrDirectory) { + Preconditions.checkArgument(fileOrDirectory.isAbsolute(), "Path must be absolute."); + if (Files.isDirectory(fileOrDirectory)) { + directories.add(fileOrDirectory); + FileAdderVisitor visitor = new FileAdderVisitor(); + Files.walkFileTree(fileOrDirectory, visitor); + files.addAll(visitor.getContainedFiles()); + } else { + files.add(fileOrDirectory); + } --- End diff -- we don't have to, it's for testing convenience as noted in the class javadocs. ---
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518310#comment-16518310 ] ASF GitHub Bot commented on FLINK-9599: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196836575 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java --- @@ -70,51 +84,103 @@ public FileUploadHandler(final Path uploadDir) { @Override protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject msg) throws Exception { - if (msg instanceof HttpRequest) { - final HttpRequest httpRequest = (HttpRequest) msg; - if (httpRequest.getMethod().equals(HttpMethod.POST)) { - if (HttpPostRequestDecoder.isMultipart(httpRequest)) { - currentHttpPostRequestDecoder = new HttpPostRequestDecoder(DATA_FACTORY, httpRequest); - currentHttpRequest = httpRequest; + try { + if (msg instanceof HttpRequest) { + final HttpRequest httpRequest = (HttpRequest) msg; + LOG.trace("Received request. URL:{} Method:{}", httpRequest.getUri(), httpRequest.getMethod()); + if (httpRequest.getMethod().equals(HttpMethod.POST)) { + if (HttpPostRequestDecoder.isMultipart(httpRequest)) { + currentHttpPostRequestDecoder = new HttpPostRequestDecoder(DATA_FACTORY, httpRequest); + currentHttpRequest = httpRequest; + currentUploadDir = Files.createDirectory(uploadDir.resolve(UUID.randomUUID().toString())); + } else { + ctx.fireChannelRead(msg); + } } else { ctx.fireChannelRead(msg); } + } else if (msg instanceof HttpContent && currentHttpPostRequestDecoder != null) { + // make sure that we still have a upload dir in case that it got deleted in the meanwhile + RestServerEndpoint.createUploadDir(uploadDir, LOG); + + final HttpContent httpContent = (HttpContent) msg; + currentHttpPostRequestDecoder.offer(httpContent); + + while (currentHttpPostRequestDecoder.hasNext()) { + final InterfaceHttpData data = currentHttpPostRequestDecoder.next(); + if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.FileUpload) { + final DiskFileUpload fileUpload = (DiskFileUpload) data; + checkState(fileUpload.isCompleted()); + + final Path dest = currentUploadDir.resolve(fileUpload.getFilename()); + fileUpload.renameTo(dest.toFile()); + } else if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) { + final Attribute request = (Attribute) data; + // this could also be implemented by using the first found Attribute as the payload + if (data.getName().equals(HTTP_ATTRIBUTE_REQUEST)) { + currentJsonPayload = request.get(); + } else { + LOG.warn("Received unknown attribute {}.", data.getName()); + HandlerUtils.sendErrorResponse( + ctx, + currentHttpRequest, + new ErrorResponseBody("Received unknown attribute " + data.getName() + '.'), + HttpResponseStatus.BAD_REQUEST, + Collections.emptyMap() + ); + deleteUploadedFiles(); +
[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196836575 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java --- @@ -70,51 +84,103 @@ public FileUploadHandler(final Path uploadDir) { @Override protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject msg) throws Exception { - if (msg instanceof HttpRequest) { - final HttpRequest httpRequest = (HttpRequest) msg; - if (httpRequest.getMethod().equals(HttpMethod.POST)) { - if (HttpPostRequestDecoder.isMultipart(httpRequest)) { - currentHttpPostRequestDecoder = new HttpPostRequestDecoder(DATA_FACTORY, httpRequest); - currentHttpRequest = httpRequest; + try { + if (msg instanceof HttpRequest) { + final HttpRequest httpRequest = (HttpRequest) msg; + LOG.trace("Received request. URL:{} Method:{}", httpRequest.getUri(), httpRequest.getMethod()); + if (httpRequest.getMethod().equals(HttpMethod.POST)) { + if (HttpPostRequestDecoder.isMultipart(httpRequest)) { + currentHttpPostRequestDecoder = new HttpPostRequestDecoder(DATA_FACTORY, httpRequest); + currentHttpRequest = httpRequest; + currentUploadDir = Files.createDirectory(uploadDir.resolve(UUID.randomUUID().toString())); + } else { + ctx.fireChannelRead(msg); + } } else { ctx.fireChannelRead(msg); } + } else if (msg instanceof HttpContent && currentHttpPostRequestDecoder != null) { + // make sure that we still have a upload dir in case that it got deleted in the meanwhile + RestServerEndpoint.createUploadDir(uploadDir, LOG); + + final HttpContent httpContent = (HttpContent) msg; + currentHttpPostRequestDecoder.offer(httpContent); + + while (currentHttpPostRequestDecoder.hasNext()) { + final InterfaceHttpData data = currentHttpPostRequestDecoder.next(); + if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.FileUpload) { + final DiskFileUpload fileUpload = (DiskFileUpload) data; + checkState(fileUpload.isCompleted()); + + final Path dest = currentUploadDir.resolve(fileUpload.getFilename()); + fileUpload.renameTo(dest.toFile()); + } else if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) { + final Attribute request = (Attribute) data; + // this could also be implemented by using the first found Attribute as the payload + if (data.getName().equals(HTTP_ATTRIBUTE_REQUEST)) { + currentJsonPayload = request.get(); + } else { + LOG.warn("Received unknown attribute {}.", data.getName()); + HandlerUtils.sendErrorResponse( + ctx, + currentHttpRequest, + new ErrorResponseBody("Received unknown attribute " + data.getName() + '.'), + HttpResponseStatus.BAD_REQUEST, + Collections.emptyMap() + ); + deleteUploadedFiles(); + reset(); + return; + } + } + } + +
[jira] [Updated] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-9599: -- Labels: pull-request-available (was: ) > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518274#comment-16518274 ] ASF GitHub Bot commented on FLINK-9514: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196827863 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * This class wraps list state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry value of state with TTL + */ +class TtlListState extends + AbstractTtlState, List>, InternalListState>> + implements InternalListState { + TtlListState( + InternalListState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public void update(List values) throws Exception { + updateInternal(values); + } + + @Override + public void addAll(List values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.addAll(withTs(values)); + } + + @Override + public Iterable get() throws Exception { + Iterable> ttlValue = original.get(); + ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue; + if (updateTsOnRead) { + List> collected = collect(ttlValue); + ttlValue = collected; + updateTs(collected); + } + final Iterable> finalResult = ttlValue; --- End diff -- Oh sorry, my bad, I'm misunderstand... > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196827863 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * This class wraps list state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry value of state with TTL + */ +class TtlListState extends + AbstractTtlState, List>, InternalListState>> + implements InternalListState { + TtlListState( + InternalListState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public void update(List values) throws Exception { + updateInternal(values); + } + + @Override + public void addAll(List values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.addAll(withTs(values)); + } + + @Override + public Iterable get() throws Exception { + Iterable> ttlValue = original.get(); + ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue; + if (updateTsOnRead) { + List> collected = collect(ttlValue); + ttlValue = collected; + updateTs(collected); + } + final Iterable> finalResult = ttlValue; --- End diff -- Oh sorry, my bad, I'm misunderstand... ---
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518267#comment-16518267 ] ASF GitHub Bot commented on FLINK-9514: --- Github user azagrebin commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196826339 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * This class wraps list state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry value of state with TTL + */ +class TtlListState extends + AbstractTtlState, List>, InternalListState>> + implements InternalListState { + TtlListState( + InternalListState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public void update(List values) throws Exception { + updateInternal(values); + } + + @Override + public void addAll(List values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.addAll(withTs(values)); + } + + @Override + public Iterable get() throws Exception { + Iterable> ttlValue = original.get(); + ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue; + if (updateTsOnRead) { + List> collected = collect(ttlValue); + ttlValue = collected; + updateTs(collected); + } + final Iterable> finalResult = ttlValue; --- End diff -- The `ttlValue` is changed before the lambda from return statement so it is not effectively immutable any more to be used in lambda, that is why `finalResult` is formally needed to avoid compilation error. > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user azagrebin commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196826339 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * This class wraps list state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry value of state with TTL + */ +class TtlListState extends + AbstractTtlState, List>, InternalListState>> + implements InternalListState { + TtlListState( + InternalListState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public void update(List values) throws Exception { + updateInternal(values); + } + + @Override + public void addAll(List values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.addAll(withTs(values)); + } + + @Override + public Iterable get() throws Exception { + Iterable> ttlValue = original.get(); + ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue; + if (updateTsOnRead) { + List> collected = collect(ttlValue); + ttlValue = collected; + updateTs(collected); + } + final Iterable> finalResult = ttlValue; --- End diff -- The `ttlValue` is changed before the lambda from return statement so it is not effectively immutable any more to be used in lambda, that is why `finalResult` is formally needed to avoid compilation error. ---
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518244#comment-16518244 ] ASF GitHub Bot commented on FLINK-9514: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196820474 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java --- @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.SupplierWithException; +import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.util.function.ThrowingRunnable; + +/** + * Base class for TTL logic wrappers. + * + * @param Type of originally wrapped object + */ +abstract class AbstractTtlDecorator { + final T original; + final TtlConfig config; + final TtlTimeProvider timeProvider; + final boolean updateTsOnRead; + final boolean returnExpired; + + AbstractTtlDecorator( + T original, + TtlConfig config, + TtlTimeProvider timeProvider) { + Preconditions.checkNotNull(original); + Preconditions.checkNotNull(config); + Preconditions.checkNotNull(timeProvider); + Preconditions.checkArgument(config.getTtlUpdateType() != TtlUpdateType.Disabled, + "State does not need to be wrapped with TTL if it is configured as disabled."); + this.original = original; + this.config = config; + this.timeProvider = timeProvider; + this.updateTsOnRead = config.getTtlUpdateType() == TtlUpdateType.OnReadAndWrite; + this.returnExpired = config.getStateVisibility() == TtlStateVisibility.Relaxed; + } + +V getUnexpried(TtlValue ttlValue) { + return ttlValue == null || (expired(ttlValue) && !returnExpired) ? null : ttlValue.getUserValue(); + } + +boolean expired(TtlValue ttlValue) { + return ttlValue != null && ttlValue.getExpirationTimestamp() <= timeProvider.currentTimestamp(); + } + +TtlValue wrapWithTs(V value) { + return wrapWithTs(value, newExpirationTimestamp()); + } + + static TtlValue wrapWithTs(V value, long ts) { + return value == null ? null : new TtlValue<>(value, ts); + } + +TtlValue rewrapWithNewTs(TtlValue ttlValue) { + return wrapWithTs(ttlValue.getUserValue()); + } + + private long newExpirationTimestamp() { + long currentTs = timeProvider.currentTimestamp(); + long ttl = config.getTtl().toMilliseconds(); --- End diff -- This will be called a lot often, so does it make sense to introduce a field to remember the `config.getTtl().toMilliseconds()`? > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196820474 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java --- @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.SupplierWithException; +import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.util.function.ThrowingRunnable; + +/** + * Base class for TTL logic wrappers. + * + * @param Type of originally wrapped object + */ +abstract class AbstractTtlDecorator { + final T original; + final TtlConfig config; + final TtlTimeProvider timeProvider; + final boolean updateTsOnRead; + final boolean returnExpired; + + AbstractTtlDecorator( + T original, + TtlConfig config, + TtlTimeProvider timeProvider) { + Preconditions.checkNotNull(original); + Preconditions.checkNotNull(config); + Preconditions.checkNotNull(timeProvider); + Preconditions.checkArgument(config.getTtlUpdateType() != TtlUpdateType.Disabled, + "State does not need to be wrapped with TTL if it is configured as disabled."); + this.original = original; + this.config = config; + this.timeProvider = timeProvider; + this.updateTsOnRead = config.getTtlUpdateType() == TtlUpdateType.OnReadAndWrite; + this.returnExpired = config.getStateVisibility() == TtlStateVisibility.Relaxed; + } + +V getUnexpried(TtlValue ttlValue) { + return ttlValue == null || (expired(ttlValue) && !returnExpired) ? null : ttlValue.getUserValue(); + } + +boolean expired(TtlValue ttlValue) { + return ttlValue != null && ttlValue.getExpirationTimestamp() <= timeProvider.currentTimestamp(); + } + +TtlValue wrapWithTs(V value) { + return wrapWithTs(value, newExpirationTimestamp()); + } + + static TtlValue wrapWithTs(V value, long ts) { + return value == null ? null : new TtlValue<>(value, ts); + } + +TtlValue rewrapWithNewTs(TtlValue ttlValue) { + return wrapWithTs(ttlValue.getUserValue()); + } + + private long newExpirationTimestamp() { + long currentTs = timeProvider.currentTimestamp(); + long ttl = config.getTtl().toMilliseconds(); --- End diff -- This will be called a lot often, so does it make sense to introduce a field to remember the `config.getTtl().toMilliseconds()`? ---
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518243#comment-16518243 ] ASF GitHub Bot commented on FLINK-9514: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196809755 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java --- @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.SupplierWithException; +import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.util.function.ThrowingRunnable; + +/** + * Base class for TTL logic wrappers. + * + * @param Type of originally wrapped object + */ +abstract class AbstractTtlDecorator { + final T original; + final TtlConfig config; + final TtlTimeProvider timeProvider; + final boolean updateTsOnRead; + final boolean returnExpired; + + AbstractTtlDecorator( + T original, + TtlConfig config, + TtlTimeProvider timeProvider) { + Preconditions.checkNotNull(original); + Preconditions.checkNotNull(config); + Preconditions.checkNotNull(timeProvider); + Preconditions.checkArgument(config.getTtlUpdateType() != TtlUpdateType.Disabled, + "State does not need to be wrapped with TTL if it is configured as disabled."); + this.original = original; + this.config = config; + this.timeProvider = timeProvider; + this.updateTsOnRead = config.getTtlUpdateType() == TtlUpdateType.OnReadAndWrite; + this.returnExpired = config.getStateVisibility() == TtlStateVisibility.Relaxed; + } + +V getUnexpried(TtlValue ttlValue) { + return ttlValue == null || (expired(ttlValue) && !returnExpired) ? null : ttlValue.getUserValue(); + } + +boolean expired(TtlValue ttlValue) { + return ttlValue != null && ttlValue.getExpirationTimestamp() <= timeProvider.currentTimestamp(); + } + +TtlValue wrapWithTs(V value) { + return wrapWithTs(value, newExpirationTimestamp()); + } + + static TtlValue wrapWithTs(V value, long ts) { + return value == null ? null : new TtlValue<>(value, ts); + } + +TtlValue rewrapWithNewTs(TtlValue ttlValue) { + return wrapWithTs(ttlValue.getUserValue()); + } + + private long newExpirationTimestamp() { + return timeProvider.currentTimestamp() + config.getTtl().toMilliseconds(); --- End diff -- This will be called a lot often, so does it make sense to introduce a field to remember the `config.getTtl().toMilliseconds()`? > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL >
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518242#comment-16518242 ] ASF GitHub Bot commented on FLINK-9514: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196816259 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * This class wraps list state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry value of state with TTL + */ +class TtlListState extends + AbstractTtlState, List>, InternalListState>> + implements InternalListState { + TtlListState( + InternalListState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public void update(List values) throws Exception { + updateInternal(values); + } + + @Override + public void addAll(List values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.addAll(withTs(values)); + } + + @Override + public Iterable get() throws Exception { + Iterable> ttlValue = original.get(); + ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue; + if (updateTsOnRead) { + List> collected = collect(ttlValue); + ttlValue = collected; + updateTs(collected); + } + final Iterable> finalResult = ttlValue; + return () -> new IteratorWithCleanup(finalResult.iterator()); + } + + private void updateTs(List> ttlValue) throws Exception { + List> unexpiredWithUpdatedTs = ttlValue.stream() + .filter(v -> !expired(v)) + .map(this::rewrapWithNewTs) + .collect(Collectors.toList()); + if (!unexpiredWithUpdatedTs.isEmpty()) { + original.update(unexpiredWithUpdatedTs); + } + } + + @Override + public void add(T value) throws Exception { + Preconditions.checkNotNull(value, "You cannot add null to a ListState."); + original.add(wrapWithTs(value)); + } + + @Override + public void clear() { + original.clear(); + } + + @Override + public void mergeNamespaces(N target, Collection sources) throws Exception { + original.mergeNamespaces(target, sources); + } + + @Override + public List getInternal() throws Exception { + return collect(get()); + } + + private List collect(Iterable iterable) { + return StreamSupport.stream(iterable.spliterator(), false).collect(Collectors.toList()); + } + + @Override + public void updateInternal(List valueToStore) throws Exception { + Preconditions.checkNotNull(valueToStore, "List of values to update cannot be null."); + original.addAll(withTs(valu
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518241#comment-16518241 ] ASF GitHub Bot commented on FLINK-9514: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196817846 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * This class wraps list state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry value of state with TTL + */ +class TtlListState extends + AbstractTtlState, List>, InternalListState>> + implements InternalListState { + TtlListState( + InternalListState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public void update(List values) throws Exception { + updateInternal(values); + } + + @Override + public void addAll(List values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.addAll(withTs(values)); + } + + @Override + public Iterable get() throws Exception { + Iterable> ttlValue = original.get(); + ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue; + if (updateTsOnRead) { + List> collected = collect(ttlValue); + ttlValue = collected; + updateTs(collected); + } + final Iterable> finalResult = ttlValue; --- End diff -- The var `finalResult` looks like redundant or I'm misunderstand. > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196809755 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java --- @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.SupplierWithException; +import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.util.function.ThrowingRunnable; + +/** + * Base class for TTL logic wrappers. + * + * @param Type of originally wrapped object + */ +abstract class AbstractTtlDecorator { + final T original; + final TtlConfig config; + final TtlTimeProvider timeProvider; + final boolean updateTsOnRead; + final boolean returnExpired; + + AbstractTtlDecorator( + T original, + TtlConfig config, + TtlTimeProvider timeProvider) { + Preconditions.checkNotNull(original); + Preconditions.checkNotNull(config); + Preconditions.checkNotNull(timeProvider); + Preconditions.checkArgument(config.getTtlUpdateType() != TtlUpdateType.Disabled, + "State does not need to be wrapped with TTL if it is configured as disabled."); + this.original = original; + this.config = config; + this.timeProvider = timeProvider; + this.updateTsOnRead = config.getTtlUpdateType() == TtlUpdateType.OnReadAndWrite; + this.returnExpired = config.getStateVisibility() == TtlStateVisibility.Relaxed; + } + +V getUnexpried(TtlValue ttlValue) { + return ttlValue == null || (expired(ttlValue) && !returnExpired) ? null : ttlValue.getUserValue(); + } + +boolean expired(TtlValue ttlValue) { + return ttlValue != null && ttlValue.getExpirationTimestamp() <= timeProvider.currentTimestamp(); + } + +TtlValue wrapWithTs(V value) { + return wrapWithTs(value, newExpirationTimestamp()); + } + + static TtlValue wrapWithTs(V value, long ts) { + return value == null ? null : new TtlValue<>(value, ts); + } + +TtlValue rewrapWithNewTs(TtlValue ttlValue) { + return wrapWithTs(ttlValue.getUserValue()); + } + + private long newExpirationTimestamp() { + return timeProvider.currentTimestamp() + config.getTtl().toMilliseconds(); --- End diff -- This will be called a lot often, so does it make sense to introduce a field to remember the `config.getTtl().toMilliseconds()`? ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196817846 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * This class wraps list state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry value of state with TTL + */ +class TtlListState extends + AbstractTtlState, List>, InternalListState>> + implements InternalListState { + TtlListState( + InternalListState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public void update(List values) throws Exception { + updateInternal(values); + } + + @Override + public void addAll(List values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.addAll(withTs(values)); + } + + @Override + public Iterable get() throws Exception { + Iterable> ttlValue = original.get(); + ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue; + if (updateTsOnRead) { + List> collected = collect(ttlValue); + ttlValue = collected; + updateTs(collected); + } + final Iterable> finalResult = ttlValue; --- End diff -- The var `finalResult` looks like redundant or I'm misunderstand. ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196816259 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * This class wraps list state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry value of state with TTL + */ +class TtlListState extends + AbstractTtlState, List>, InternalListState>> + implements InternalListState { + TtlListState( + InternalListState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public void update(List values) throws Exception { + updateInternal(values); + } + + @Override + public void addAll(List values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.addAll(withTs(values)); + } + + @Override + public Iterable get() throws Exception { + Iterable> ttlValue = original.get(); + ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue; + if (updateTsOnRead) { + List> collected = collect(ttlValue); + ttlValue = collected; + updateTs(collected); + } + final Iterable> finalResult = ttlValue; + return () -> new IteratorWithCleanup(finalResult.iterator()); + } + + private void updateTs(List> ttlValue) throws Exception { + List> unexpiredWithUpdatedTs = ttlValue.stream() + .filter(v -> !expired(v)) + .map(this::rewrapWithNewTs) + .collect(Collectors.toList()); + if (!unexpiredWithUpdatedTs.isEmpty()) { + original.update(unexpiredWithUpdatedTs); + } + } + + @Override + public void add(T value) throws Exception { + Preconditions.checkNotNull(value, "You cannot add null to a ListState."); + original.add(wrapWithTs(value)); + } + + @Override + public void clear() { + original.clear(); + } + + @Override + public void mergeNamespaces(N target, Collection sources) throws Exception { + original.mergeNamespaces(target, sources); + } + + @Override + public List getInternal() throws Exception { + return collect(get()); + } + + private List collect(Iterable iterable) { + return StreamSupport.stream(iterable.spliterator(), false).collect(Collectors.toList()); + } + + @Override + public void updateInternal(List valueToStore) throws Exception { + Preconditions.checkNotNull(valueToStore, "List of values to update cannot be null."); + original.addAll(withTs(valueToStore)); --- End diff -- This seems to miss a `clear()`. ---
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518240#comment-16518240 ] ASF GitHub Bot commented on FLINK-9514: --- Github user azagrebin commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196819320 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java --- @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.SupplierWithException; +import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.util.function.ThrowingRunnable; + +/** + * Base class for TTL logic wrappers. + * + * @param Type of originally wrapped object + */ +abstract class AbstractTtlDecorator { + final T original; + final TtlConfig config; + final TtlTimeProvider timeProvider; + final boolean updateTsOnRead; + final boolean returnExpired; + + AbstractTtlDecorator( + T original, + TtlConfig config, + TtlTimeProvider timeProvider) { + Preconditions.checkNotNull(original); + Preconditions.checkNotNull(config); + Preconditions.checkNotNull(timeProvider); + Preconditions.checkArgument(config.getTtlUpdateType() != TtlUpdateType.Disabled, + "State does not need to be wrapped with TTL if it is configured as disabled."); + this.original = original; + this.config = config; + this.timeProvider = timeProvider; + this.updateTsOnRead = config.getTtlUpdateType() == TtlUpdateType.OnReadAndWrite; + this.returnExpired = config.getStateVisibility() == TtlStateVisibility.Relaxed; + } + +V getUnexpried(TtlValue ttlValue) { + return ttlValue == null || (expired(ttlValue) && !returnExpired) ? null : ttlValue.getUserValue(); + } + +boolean expired(TtlValue ttlValue) { + return ttlValue != null && ttlValue.getExpirationTimestamp() <= timeProvider.currentTimestamp(); --- End diff -- In general Flink allows to operate in negative range for event time, but the overflow in case of very big TTL should be checked. TTL makes sense only non-negative. I have added fix for it. > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user azagrebin commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196819320 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java --- @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.SupplierWithException; +import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.util.function.ThrowingRunnable; + +/** + * Base class for TTL logic wrappers. + * + * @param Type of originally wrapped object + */ +abstract class AbstractTtlDecorator { + final T original; + final TtlConfig config; + final TtlTimeProvider timeProvider; + final boolean updateTsOnRead; + final boolean returnExpired; + + AbstractTtlDecorator( + T original, + TtlConfig config, + TtlTimeProvider timeProvider) { + Preconditions.checkNotNull(original); + Preconditions.checkNotNull(config); + Preconditions.checkNotNull(timeProvider); + Preconditions.checkArgument(config.getTtlUpdateType() != TtlUpdateType.Disabled, + "State does not need to be wrapped with TTL if it is configured as disabled."); + this.original = original; + this.config = config; + this.timeProvider = timeProvider; + this.updateTsOnRead = config.getTtlUpdateType() == TtlUpdateType.OnReadAndWrite; + this.returnExpired = config.getStateVisibility() == TtlStateVisibility.Relaxed; + } + +V getUnexpried(TtlValue ttlValue) { + return ttlValue == null || (expired(ttlValue) && !returnExpired) ? null : ttlValue.getUserValue(); + } + +boolean expired(TtlValue ttlValue) { + return ttlValue != null && ttlValue.getExpirationTimestamp() <= timeProvider.currentTimestamp(); --- End diff -- In general Flink allows to operate in negative range for event time, but the overflow in case of very big TTL should be checked. TTL makes sense only non-negative. I have added fix for it. ---
[jira] [Updated] (FLINK-9598) [Checkpoints] The config Minimum Pause Between Checkpoints doesn't work when there's a checkpoint failure
[ https://issues.apache.org/jira/browse/FLINK-9598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Prem Santosh updated FLINK-9598: Description: We have set the config Minimum Pause Between Checkpoints to be 10 min but noticed that when a checkpoint fails (because it timesout before it completes) the application immediately starts taking the next checkpoint. This basically stalls the application's progress since its always taking checkpoints. [Here|^Screen Shot 2018-06-20 at 7.44.10 AM.png] is a screenshot of this issue. (Sorry about the imgur link, I was not able to attach a screenshot on Jira because of some error) Details: * Running Flink-1.3.2 on EMR * checkpoint timeout duration: 40 min * minimum pause between checkpoints: 10 min There is also a [relevant thread|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Having-a-backoff-while-experiencing-checkpointing-failures-td20618.html] that I found on the Flink users group. was: We have set the config Minimum Pause Between Checkpoints to be 10 min but noticed that when a checkpoint fails (because it timesout before it completes) the application immediately starts taking the next checkpoint. This basically stalls the application's progress since its always taking checkpoints. [#Here] is a screenshot of this issue. (Sorry about the imgur link, I was not able to attach a screenshot on Jira because of some error) Details: * Running Flink-1.3.2 on EMR * checkpoint timeout duration: 40 min * minimum pause between checkpoints: 10 min There is also a [relevant thread|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Having-a-backoff-while-experiencing-checkpointing-failures-td20618.html] that I found on the Flink users group. > [Checkpoints] The config Minimum Pause Between Checkpoints doesn't work when > there's a checkpoint failure > - > > Key: FLINK-9598 > URL: https://issues.apache.org/jira/browse/FLINK-9598 > Project: Flink > Issue Type: Bug >Affects Versions: 1.3.2 >Reporter: Prem Santosh >Assignee: vinoyang >Priority: Major > Attachments: Screen Shot 2018-06-20 at 7.44.10 AM.png > > > We have set the config Minimum Pause Between Checkpoints to be 10 min but > noticed that when a checkpoint fails (because it timesout before it > completes) the application immediately starts taking the next checkpoint. > This basically stalls the application's progress since its always taking > checkpoints. > [Here|^Screen Shot 2018-06-20 at 7.44.10 AM.png] is a screenshot of this > issue. (Sorry about the imgur link, I was not able to attach a screenshot on > Jira because of some error) > Details: > * Running Flink-1.3.2 on EMR > * checkpoint timeout duration: 40 min > * minimum pause between checkpoints: 10 min > There is also a [relevant > thread|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Having-a-backoff-while-experiencing-checkpointing-failures-td20618.html] > that I found on the Flink users group. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9598) [Checkpoints] The config Minimum Pause Between Checkpoints doesn't work when there's a checkpoint failure
[ https://issues.apache.org/jira/browse/FLINK-9598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Prem Santosh updated FLINK-9598: Description: We have set the config Minimum Pause Between Checkpoints to be 10 min but noticed that when a checkpoint fails (because it timesout before it completes) the application immediately starts taking the next checkpoint. This basically stalls the application's progress since its always taking checkpoints. [^Screen Shot 2018-06-20 at 7.44.10 AM.png] is a screenshot of this issue. Details: * Running Flink-1.3.2 on EMR * checkpoint timeout duration: 40 min * minimum pause between checkpoints: 10 min There is also a [relevant thread|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Having-a-backoff-while-experiencing-checkpointing-failures-td20618.html] that I found on the Flink users group. was: We have set the config Minimum Pause Between Checkpoints to be 10 min but noticed that when a checkpoint fails (because it timesout before it completes) the application immediately starts taking the next checkpoint. This basically stalls the application's progress since its always taking checkpoints. [Here|^Screen Shot 2018-06-20 at 7.44.10 AM.png] is a screenshot of this issue. (Sorry about the imgur link, I was not able to attach a screenshot on Jira because of some error) Details: * Running Flink-1.3.2 on EMR * checkpoint timeout duration: 40 min * minimum pause between checkpoints: 10 min There is also a [relevant thread|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Having-a-backoff-while-experiencing-checkpointing-failures-td20618.html] that I found on the Flink users group. > [Checkpoints] The config Minimum Pause Between Checkpoints doesn't work when > there's a checkpoint failure > - > > Key: FLINK-9598 > URL: https://issues.apache.org/jira/browse/FLINK-9598 > Project: Flink > Issue Type: Bug >Affects Versions: 1.3.2 >Reporter: Prem Santosh >Assignee: vinoyang >Priority: Major > Attachments: Screen Shot 2018-06-20 at 7.44.10 AM.png > > > We have set the config Minimum Pause Between Checkpoints to be 10 min but > noticed that when a checkpoint fails (because it timesout before it > completes) the application immediately starts taking the next checkpoint. > This basically stalls the application's progress since its always taking > checkpoints. > [^Screen Shot 2018-06-20 at 7.44.10 AM.png] is a screenshot of this issue. > Details: > * Running Flink-1.3.2 on EMR > * checkpoint timeout duration: 40 min > * minimum pause between checkpoints: 10 min > There is also a [relevant > thread|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Having-a-backoff-while-experiencing-checkpointing-failures-td20618.html] > that I found on the Flink users group. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9598) [Checkpoints] The config Minimum Pause Between Checkpoints doesn't work when there's a checkpoint failure
[ https://issues.apache.org/jira/browse/FLINK-9598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Prem Santosh updated FLINK-9598: Description: We have set the config Minimum Pause Between Checkpoints to be 10 min but noticed that when a checkpoint fails (because it timesout before it completes) the application immediately starts taking the next checkpoint. This basically stalls the application's progress since its always taking checkpoints. [#Here] is a screenshot of this issue. (Sorry about the imgur link, I was not able to attach a screenshot on Jira because of some error) Details: * Running Flink-1.3.2 on EMR * checkpoint timeout duration: 40 min * minimum pause between checkpoints: 10 min There is also a [relevant thread|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Having-a-backoff-while-experiencing-checkpointing-failures-td20618.html] that I found on the Flink users group. was: We have set the config Minimum Pause Between Checkpoints to be 10 min but noticed that when a checkpoint fails (because it timesout before it completes) the application immediately starts taking the next checkpoint. This basically stalls the application's progress since its always taking checkpoints. Here i[s a screenshot of |https://imgur.com/a/z7NMY4H]this issue. (Sorry about the imgur link, I was not able to attach a screenshot on Jira because of some error) Details: * Running Flink-1.3.2 on EMR * checkpoint timeout duration: 40 min * minimum pause between checkpoints: 10 min There is also a [relevant thread|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Having-a-backoff-while-experiencing-checkpointing-failures-td20618.html] that I found on the Flink users group. > [Checkpoints] The config Minimum Pause Between Checkpoints doesn't work when > there's a checkpoint failure > - > > Key: FLINK-9598 > URL: https://issues.apache.org/jira/browse/FLINK-9598 > Project: Flink > Issue Type: Bug >Affects Versions: 1.3.2 >Reporter: Prem Santosh >Assignee: vinoyang >Priority: Major > Attachments: Screen Shot 2018-06-20 at 7.44.10 AM.png > > > We have set the config Minimum Pause Between Checkpoints to be 10 min but > noticed that when a checkpoint fails (because it timesout before it > completes) the application immediately starts taking the next checkpoint. > This basically stalls the application's progress since its always taking > checkpoints. > [#Here] is a screenshot of this issue. (Sorry about the imgur link, I was not > able to attach a screenshot on Jira because of some error) > Details: > * Running Flink-1.3.2 on EMR > * checkpoint timeout duration: 40 min > * minimum pause between checkpoints: 10 min > There is also a [relevant > thread|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Having-a-backoff-while-experiencing-checkpointing-failures-td20618.html] > that I found on the Flink users group. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9598) [Checkpoints] The config Minimum Pause Between Checkpoints doesn't work when there's a checkpoint failure
[ https://issues.apache.org/jira/browse/FLINK-9598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Prem Santosh updated FLINK-9598: Attachment: Screen Shot 2018-06-20 at 7.44.10 AM.png > [Checkpoints] The config Minimum Pause Between Checkpoints doesn't work when > there's a checkpoint failure > - > > Key: FLINK-9598 > URL: https://issues.apache.org/jira/browse/FLINK-9598 > Project: Flink > Issue Type: Bug >Affects Versions: 1.3.2 >Reporter: Prem Santosh >Assignee: vinoyang >Priority: Major > Attachments: Screen Shot 2018-06-20 at 7.44.10 AM.png > > > We have set the config Minimum Pause Between Checkpoints to be 10 min but > noticed that when a checkpoint fails (because it timesout before it > completes) the application immediately starts taking the next checkpoint. > This basically stalls the application's progress since its always taking > checkpoints. > Here i[s a screenshot of |https://imgur.com/a/z7NMY4H]this issue. (Sorry > about the imgur link, I was not able to attach a screenshot on Jira because > of some error) > Details: > * Running Flink-1.3.2 on EMR > * checkpoint timeout duration: 40 min > * minimum pause between checkpoints: 10 min > There is also a [relevant > thread|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Having-a-backoff-while-experiencing-checkpointing-failures-td20618.html] > that I found on the Flink users group. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518200#comment-16518200 ] ASF GitHub Bot commented on FLINK-9514: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196791555 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlConfig.java --- @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.util.Preconditions; + +/** + * Configuration of state TTL logic. + * TODO: builder + */ +public class TtlConfig { + private final TtlUpdateType ttlUpdateType; + private final TtlStateVisibility stateVisibility; + private final TtlTimeCharacteristic timeCharacteristic; + private final Time ttl; + + public TtlConfig( + TtlUpdateType ttlUpdateType, + TtlStateVisibility stateVisibility, + TtlTimeCharacteristic timeCharacteristic, + Time ttl) { + Preconditions.checkNotNull(ttlUpdateType); + Preconditions.checkNotNull(stateVisibility); + Preconditions.checkNotNull(timeCharacteristic); --- End diff -- Why not checking for `ttl`? > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196791555 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlConfig.java --- @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.util.Preconditions; + +/** + * Configuration of state TTL logic. + * TODO: builder + */ +public class TtlConfig { + private final TtlUpdateType ttlUpdateType; + private final TtlStateVisibility stateVisibility; + private final TtlTimeCharacteristic timeCharacteristic; + private final Time ttl; + + public TtlConfig( + TtlUpdateType ttlUpdateType, + TtlStateVisibility stateVisibility, + TtlTimeCharacteristic timeCharacteristic, + Time ttl) { + Preconditions.checkNotNull(ttlUpdateType); + Preconditions.checkNotNull(stateVisibility); + Preconditions.checkNotNull(timeCharacteristic); --- End diff -- Why not checking for `ttl`? ---
[jira] [Commented] (FLINK-9604) Support KafkaProtoBufTableSource
[ https://issues.apache.org/jira/browse/FLINK-9604?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518197#comment-16518197 ] Timo Walther commented on FLINK-9604: - Thanks for working on this issue [~mingleizhang]. In the future we want to separate connector and format. Such that a Protobuf serialization schema can also be used for other connectors not only Kafka. Feel free to open a PR for serialization/deserialization schemas that convert to Table API types similar as done in FLINK-8630. > Support KafkaProtoBufTableSource > > > Key: FLINK-9604 > URL: https://issues.apache.org/jira/browse/FLINK-9604 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: mingleizhang >Assignee: mingleizhang >Priority: Major > > Protocol buffers are a language-neutral, platform-neutral extensible > mechanism for serializing structured data. And in actual production > applications, Protocol Buffers is often used for serialization and > deserialization. So, I would suggest add this commonly used function. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9628) Options to tolerate truncate failure in BucketingSink
Truong Duc Kien created FLINK-9628: -- Summary: Options to tolerate truncate failure in BucketingSink Key: FLINK-9628 URL: https://issues.apache.org/jira/browse/FLINK-9628 Project: Flink Issue Type: Improvement Affects Versions: 1.5.0 Reporter: Truong Duc Kien When the target filesystem is corrupted, truncate operation might fail permanently, causing the job to restart repeatedly and unable to progress. There should be an option to ignore these kind of errors and allows the program to continue -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518183#comment-16518183 ] ASF GitHub Bot commented on FLINK-9599: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6189#discussion_r196793648 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java --- @@ -162,13 +205,25 @@ public void shutdown(Time timeout) { ByteBuf payload = Unpooled.wrappedBuffer(sw.toString().getBytes(ConfigConstants.DEFAULT_CHARSET)); // create request and set headers - FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, messageHeaders.getHttpMethod().getNettyHttpMethod(), targetUrl, payload); + final FullHttpRequest httpRequest; + if (multipart) { + httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, messageHeaders.getHttpMethod().getNettyHttpMethod(), targetUrl); + } else { + httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, messageHeaders.getHttpMethod().getNettyHttpMethod(), targetUrl, payload); + } + httpRequest.headers() - .add(HttpHeaders.Names.CONTENT_LENGTH, payload.capacity()) - .add(HttpHeaders.Names.CONTENT_TYPE, RestConstants.REST_CONTENT_TYPE) .set(HttpHeaders.Names.HOST, targetAddress + ':' + targetPort) .set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE); + if (!multipart) { + httpRequest.headers() --- End diff -- wasn't sure whether this should also be done by the `RequestProcessor` > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518182#comment-16518182 ] ASF GitHub Bot commented on FLINK-9599: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6189#discussion_r196793605 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java --- @@ -162,13 +205,25 @@ public void shutdown(Time timeout) { ByteBuf payload = Unpooled.wrappedBuffer(sw.toString().getBytes(ConfigConstants.DEFAULT_CHARSET)); // create request and set headers - FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, messageHeaders.getHttpMethod().getNettyHttpMethod(), targetUrl, payload); + final FullHttpRequest httpRequest; + if (multipart) { --- End diff -- wasn't sure whether this should also be done by the `RequestProcessor` > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6189: [FLINK-9599][rest] RestClient supports FileUploads...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6189#discussion_r196793605 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java --- @@ -162,13 +205,25 @@ public void shutdown(Time timeout) { ByteBuf payload = Unpooled.wrappedBuffer(sw.toString().getBytes(ConfigConstants.DEFAULT_CHARSET)); // create request and set headers - FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, messageHeaders.getHttpMethod().getNettyHttpMethod(), targetUrl, payload); + final FullHttpRequest httpRequest; + if (multipart) { --- End diff -- wasn't sure whether this should also be done by the `RequestProcessor` ---
[GitHub] flink pull request #6189: [FLINK-9599][rest] RestClient supports FileUploads...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6189#discussion_r196793648 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java --- @@ -162,13 +205,25 @@ public void shutdown(Time timeout) { ByteBuf payload = Unpooled.wrappedBuffer(sw.toString().getBytes(ConfigConstants.DEFAULT_CHARSET)); // create request and set headers - FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, messageHeaders.getHttpMethod().getNettyHttpMethod(), targetUrl, payload); + final FullHttpRequest httpRequest; + if (multipart) { + httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, messageHeaders.getHttpMethod().getNettyHttpMethod(), targetUrl); + } else { + httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, messageHeaders.getHttpMethod().getNettyHttpMethod(), targetUrl, payload); + } + httpRequest.headers() - .add(HttpHeaders.Names.CONTENT_LENGTH, payload.capacity()) - .add(HttpHeaders.Names.CONTENT_TYPE, RestConstants.REST_CONTENT_TYPE) .set(HttpHeaders.Names.HOST, targetAddress + ':' + targetPort) .set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE); + if (!multipart) { + httpRequest.headers() --- End diff -- wasn't sure whether this should also be done by the `RequestProcessor` ---
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518180#comment-16518180 ] ASF GitHub Bot commented on FLINK-9599: --- GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/6189 [FLINK-9599][rest] RestClient supports FileUploads This PR is based on a squashed #6178. ## What is the purpose of the change This PR extends the `RestClient` to allow sending multipart messages containing files and an optional json payload. @tillrohrmann Regarding the previously discussed issue about ´EMPTY_LAST_HTTP_CONTENT`, you can reproduce the issue by reverting the change to the `FileUploadHandler` and running the `RestClientMultipartTest`. ## Brief change log * rework `FileUploadHandlerTest` into an abstract base class, to re-use classes for the `RestClient` ## Verifying this change * add `RequestProcess` interface for hiding differences between non-/multipart messages * refactor `RestClient#sendRequest` into `internalSendRequest` that allows passing a `RequestProcessor` * see `RestClientMultipartTest` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 9280_gamma Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6189.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6189 commit 97d7eaf2adbc8174025fafd32b3424f05bd27da1 Author: zentol Date: 2018-06-18T08:54:42Z [FLINK-9599][rest] Implement generic mechanism to access uploaded files commit 0ea85f1658c3ceca09a481e27392ed39b955d8bb Author: zentol Date: 2018-06-19T07:45:09Z [FLINK-9599][rest] RestClient supports FileUploads > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6189: [FLINK-9599][rest] RestClient supports FileUploads...
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/6189 [FLINK-9599][rest] RestClient supports FileUploads This PR is based on a squashed #6178. ## What is the purpose of the change This PR extends the `RestClient` to allow sending multipart messages containing files and an optional json payload. @tillrohrmann Regarding the previously discussed issue about ´EMPTY_LAST_HTTP_CONTENT`, you can reproduce the issue by reverting the change to the `FileUploadHandler` and running the `RestClientMultipartTest`. ## Brief change log * rework `FileUploadHandlerTest` into an abstract base class, to re-use classes for the `RestClient` ## Verifying this change * add `RequestProcess` interface for hiding differences between non-/multipart messages * refactor `RestClient#sendRequest` into `internalSendRequest` that allows passing a `RequestProcessor` * see `RestClientMultipartTest` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 9280_gamma Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6189.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6189 commit 97d7eaf2adbc8174025fafd32b3424f05bd27da1 Author: zentol Date: 2018-06-18T08:54:42Z [FLINK-9599][rest] Implement generic mechanism to access uploaded files commit 0ea85f1658c3ceca09a481e27392ed39b955d8bb Author: zentol Date: 2018-06-19T07:45:09Z [FLINK-9599][rest] RestClient supports FileUploads ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196786370 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java --- @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.SupplierWithException; +import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.util.function.ThrowingRunnable; + +/** + * Base class for TTL logic wrappers. + * + * @param Type of originally wrapped object + */ +abstract class AbstractTtlDecorator { + final T original; + final TtlConfig config; + final TtlTimeProvider timeProvider; + final boolean updateTsOnRead; + final boolean returnExpired; + + AbstractTtlDecorator( + T original, + TtlConfig config, + TtlTimeProvider timeProvider) { + Preconditions.checkNotNull(original); + Preconditions.checkNotNull(config); + Preconditions.checkNotNull(timeProvider); + Preconditions.checkArgument(config.getTtlUpdateType() != TtlUpdateType.Disabled, + "State does not need to be wrapped with TTL if it is configured as disabled."); + this.original = original; + this.config = config; + this.timeProvider = timeProvider; + this.updateTsOnRead = config.getTtlUpdateType() == TtlUpdateType.OnReadAndWrite; + this.returnExpired = config.getStateVisibility() == TtlStateVisibility.Relaxed; + } + +V getUnexpried(TtlValue ttlValue) { + return ttlValue == null || (expired(ttlValue) && !returnExpired) ? null : ttlValue.getUserValue(); + } + +boolean expired(TtlValue ttlValue) { + return ttlValue != null && ttlValue.getExpirationTimestamp() <= timeProvider.currentTimestamp(); --- End diff -- Does it make sense to never expire the value when the `ttValue.getExpirationTimestamp()` return negative? ---
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518155#comment-16518155 ] ASF GitHub Bot commented on FLINK-9514: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196786370 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java --- @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.SupplierWithException; +import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.util.function.ThrowingRunnable; + +/** + * Base class for TTL logic wrappers. + * + * @param Type of originally wrapped object + */ +abstract class AbstractTtlDecorator { + final T original; + final TtlConfig config; + final TtlTimeProvider timeProvider; + final boolean updateTsOnRead; + final boolean returnExpired; + + AbstractTtlDecorator( + T original, + TtlConfig config, + TtlTimeProvider timeProvider) { + Preconditions.checkNotNull(original); + Preconditions.checkNotNull(config); + Preconditions.checkNotNull(timeProvider); + Preconditions.checkArgument(config.getTtlUpdateType() != TtlUpdateType.Disabled, + "State does not need to be wrapped with TTL if it is configured as disabled."); + this.original = original; + this.config = config; + this.timeProvider = timeProvider; + this.updateTsOnRead = config.getTtlUpdateType() == TtlUpdateType.OnReadAndWrite; + this.returnExpired = config.getStateVisibility() == TtlStateVisibility.Relaxed; + } + +V getUnexpried(TtlValue ttlValue) { + return ttlValue == null || (expired(ttlValue) && !returnExpired) ? null : ttlValue.getUserValue(); + } + +boolean expired(TtlValue ttlValue) { + return ttlValue != null && ttlValue.getExpirationTimestamp() <= timeProvider.currentTimestamp(); --- End diff -- Does it make sense to never expire the value when the `ttValue.getExpirationTimestamp()` return negative? > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518152#comment-16518152 ] ASF GitHub Bot commented on FLINK-9514: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196784275 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java --- @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.SupplierWithException; +import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.util.function.ThrowingRunnable; + +/** + * Base class for TTL logic wrappers. + * + * @param Type of originally wrapped object + */ +abstract class AbstractTtlDecorator { + final T original; + final TtlConfig config; + final TtlTimeProvider timeProvider; + final boolean updateTsOnRead; + final boolean returnExpired; + + AbstractTtlDecorator( + T original, + TtlConfig config, + TtlTimeProvider timeProvider) { + Preconditions.checkNotNull(original); + Preconditions.checkNotNull(config); + Preconditions.checkNotNull(timeProvider); + Preconditions.checkArgument(config.getTtlUpdateType() != TtlUpdateType.Disabled, + "State does not need to be wrapped with TTL if it is configured as disabled."); + this.original = original; + this.config = config; + this.timeProvider = timeProvider; + this.updateTsOnRead = config.getTtlUpdateType() == TtlUpdateType.OnReadAndWrite; + this.returnExpired = config.getStateVisibility() == TtlStateVisibility.Relaxed; + } + +V getUnexpried(TtlValue ttlValue) { + return ttlValue == null || (expired(ttlValue) && !returnExpired) ? null : ttlValue.getUserValue(); + } + +boolean expired(TtlValue ttlValue) { + return ttlValue != null && ttlValue.getExpirationTimestamp() <= timeProvider.currentTimestamp(); --- End diff -- This looks like a bit problematic, because the `ttlValue.getExpirationTimestamp()` might be negative. E.g when the user provide `Long.MAX_VALUE` as the TTL value, what he expected is that the value should never be expired, but according to the current code, it will immediately expired. > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196784275 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java --- @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.SupplierWithException; +import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.util.function.ThrowingRunnable; + +/** + * Base class for TTL logic wrappers. + * + * @param Type of originally wrapped object + */ +abstract class AbstractTtlDecorator { + final T original; + final TtlConfig config; + final TtlTimeProvider timeProvider; + final boolean updateTsOnRead; + final boolean returnExpired; + + AbstractTtlDecorator( + T original, + TtlConfig config, + TtlTimeProvider timeProvider) { + Preconditions.checkNotNull(original); + Preconditions.checkNotNull(config); + Preconditions.checkNotNull(timeProvider); + Preconditions.checkArgument(config.getTtlUpdateType() != TtlUpdateType.Disabled, + "State does not need to be wrapped with TTL if it is configured as disabled."); + this.original = original; + this.config = config; + this.timeProvider = timeProvider; + this.updateTsOnRead = config.getTtlUpdateType() == TtlUpdateType.OnReadAndWrite; + this.returnExpired = config.getStateVisibility() == TtlStateVisibility.Relaxed; + } + +V getUnexpried(TtlValue ttlValue) { + return ttlValue == null || (expired(ttlValue) && !returnExpired) ? null : ttlValue.getUserValue(); + } + +boolean expired(TtlValue ttlValue) { + return ttlValue != null && ttlValue.getExpirationTimestamp() <= timeProvider.currentTimestamp(); --- End diff -- This looks like a bit problematic, because the `ttlValue.getExpirationTimestamp()` might be negative. E.g when the user provide `Long.MAX_VALUE` as the TTL value, what he expected is that the value should never be expired, but according to the current code, it will immediately expired. ---
[jira] [Comment Edited] (FLINK-6846) Add TIMESTAMPADD supported in TableAPI
[ https://issues.apache.org/jira/browse/FLINK-6846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518099#comment-16518099 ] xueyu edited comment on FLINK-6846 at 6/20/18 1:36 PM: --- I want to submit a PR for this function, would you mind change Assignee to me, thanks I have submitted PR FLINK-6846 was (Author: xueyu7452): I want to submit a PR for this function, would you mind change Assignee to me, thanks > Add TIMESTAMPADD supported in TableAPI > -- > > Key: FLINK-6846 > URL: https://issues.apache.org/jira/browse/FLINK-6846 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: sunjincheng >Assignee: sunjincheng >Priority: Major > Labels: starter > > See FLINK-6811 for detail. -- This message was sent by Atlassian JIRA (v7.6.3#76005)