[jira] [Created] (FLINK-12955) Add HBase LookupableTableSource
Chance Li created FLINK-12955: - Summary: Add HBase LookupableTableSource Key: FLINK-12955 URL: https://issues.apache.org/jira/browse/FLINK-12955 Project: Flink Issue Type: New Feature Reporter: Chance Li -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12833) Add Klaviyo to Chinese PoweredBy page
[ https://issues.apache.org/jira/browse/FLINK-12833?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu updated FLINK-12833: Issue Type: Sub-task (was: Task) Parent: FLINK-11526 > Add Klaviyo to Chinese PoweredBy page > - > > Key: FLINK-12833 > URL: https://issues.apache.org/jira/browse/FLINK-12833 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Project Website >Reporter: Fabian Hueske >Priority: Major > > Commit b54ecfa930653bcfecd60df3414deca5291c6cb3 added Klaviyo to the English > PoweredBy page. > It should be added to the Chinese page as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] wuchong commented on issue #8695: [FLINK-12805][FLINK-12808][FLINK-12809][table-api] Introduce PartitionableTableSource and PartitionableTableSink and OverwritableTableSink
wuchong commented on issue #8695: [FLINK-12805][FLINK-12808][FLINK-12809][table-api] Introduce PartitionableTableSource and PartitionableTableSink and OverwritableTableSink URL: https://github.com/apache/flink/pull/8695#issuecomment-504865125 Hi @aljoscha , do you have time to have a look again? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12930) Update Chinese "how to contribute" pages
[ https://issues.apache.org/jira/browse/FLINK-12930?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu updated FLINK-12930: Issue Type: Sub-task (was: Task) Parent: FLINK-11526 > Update Chinese "how to contribute" pages > > > Key: FLINK-12930 > URL: https://issues.apache.org/jira/browse/FLINK-12930 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Project Website >Reporter: Robert Metzger >Priority: Major > > FLINK-12605 updated the "How to contribute" pages. Thus, we need to update > the Chinese translation of those pages. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] asfgit closed pull request #8730: [FLINK-12834][table-planner-blink] Support CharType and BinaryType
asfgit closed pull request #8730: [FLINK-12834][table-planner-blink] Support CharType and BinaryType URL: https://github.com/apache/flink/pull/8730 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-12834) Support CharType and BinaryType in blink runner
[ https://issues.apache.org/jira/browse/FLINK-12834?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu closed FLINK-12834. --- Resolution: Fixed Fix Version/s: 1.9.0 Fixed in 1.9.0: 1ee24d6c626e8d361354721ad3de41d18e33bb70 > Support CharType and BinaryType in blink runner > --- > > Key: FLINK-12834 > URL: https://issues.apache.org/jira/browse/FLINK-12834 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Planner >Reporter: Jingsong Lee >Assignee: Jingsong Lee >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > 1.Now we use LogicalType VarcharType to support calcite char type. > 2.Subsequent TableApi also generates LogicalType's CharType. > We need real support CharType in internal code gen and computation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] wuchong commented on issue #8730: [FLINK-12834][table-planner-blink] Support CharType and BinaryType
wuchong commented on issue #8730: [FLINK-12834][table-planner-blink] Support CharType and BinaryType URL: https://github.com/apache/flink/pull/8730#issuecomment-504862280 @flinkbot approve all This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-12954) Supports watermark and create view for sql parser
Danny Chan created FLINK-12954: -- Summary: Supports watermark and create view for sql parser Key: FLINK-12954 URL: https://issues.apache.org/jira/browse/FLINK-12954 Project: Flink Issue Type: Sub-task Components: Table SQL / API Affects Versions: 1.9.0 Reporter: Danny Chan Assignee: Danny Chan Fix For: 1.9.0 Add watermark to create table; Also add create view grammar -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] becketqin commented on a change in pull request #8776: [FLINK-12881][ml] Add more functionalities for ML Params and ParamInfo class
becketqin commented on a change in pull request #8776: [FLINK-12881][ml] Add more functionalities for ML Params and ParamInfo class URL: https://github.com/apache/flink/pull/8776#discussion_r296513114 ## File path: flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/misc/param/Params.java ## @@ -25,16 +25,58 @@ import java.io.IOException; import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; /** * The map-like container class for parameter. This class is provided to unify the interaction with * parameters. */ @PublicEvolving -public class Params implements Serializable { - private final Map paramMap = new HashMap<>(); +public class Params implements Serializable, Cloneable { + private static final long serialVersionUID = 1L; + + /** +* A mapping from param name to its value. +* +* The value is stored in map using json format. +*/ + private final Map params; + + private transient ObjectMapper mapper; + + public Params() { + this.params = new HashMap<>(); + } + + /** +* Return the number of params. +* +* @return Return the number of params. +*/ + public int size() { + return params.size(); + } + + /** +* Removes all of the params. +* The params will be empty after this call returns. +*/ + public void clear() { + params.clear(); + } + + /** +* Returns true if this params contains no mappings. +* +* @return true if this map contains no mappings +*/ + public boolean isEmpty() { + return params.isEmpty(); + } /** * Returns the value of the specific parameter, or default value defined in the {@code info} if Review comment: I moved some of the java doc from ParamInfo to here. See if it is clearer. ``` /** * Returns the value of the specific parameter, or default value defined in the {@code info} if * this Params doesn't have a value set for the parameter. An exception will be thrown in the * following cases because no value could be found for the specified parameter. * * * Non-optional parameter: no value is defined in this params for a non-optional parameter. * * * Optional parameter: no value is defined in this params and no default value is defined. * * * * @param info the info of the specific parameter, usually with default value * @param the type of the specific parameter * @return the value of the specific parameter, or default value defined in the {@code info} if * this Params doesn't contain the parameter * @throws IllegalArgumentException if no value can be found for specified parameter */ ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] becketqin commented on a change in pull request #8776: [FLINK-12881][ml] Add more functionalities for ML Params and ParamInfo class
becketqin commented on a change in pull request #8776: [FLINK-12881][ml] Add more functionalities for ML Params and ParamInfo class URL: https://github.com/apache/flink/pull/8776#discussion_r296509197 ## File path: flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/misc/param/ParamInfo.java ## @@ -19,12 +19,25 @@ package org.apache.flink.ml.api.misc.param; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.util.Preconditions; /** * Definition of a parameter, including name, type, default value, validator and so on. * * This class is provided to unify the interaction with parameters. * + * when isOptional is true, contain(ParamInfo) is true, it will return the value found in Params, Review comment: This class is only about definition. It might be better to keep the semantic of `contains()` and `get()` in there own method java doc. How about just mention the following here? Also, should we throw exception if a default value is specified for a non-optional parameter? ``` /** * Definition of a parameter, including name, type, default value, validator and so on. * * A parameter can either be optional or non-optional. * * * A non-optional parameter should not have a default value. Instead, its value must be provided by the users. * * * An optional parameter may or may not have a default value. * * * * Please see {@link Params#get(ParamInfo)} and {@link Params#contains(ParamInfo)} for more details about the behavior. * * A parameter may have aliases in addition to the parameter name for convenience and compatibility purposes. One * should not set values for both parameter name and an alias. One and only one value should be set either under * the parameter name or one of the alias. * * @param the type of the param value */ ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] becketqin commented on a change in pull request #8776: [FLINK-12881][ml] Add more functionalities for ML Params and ParamInfo class
becketqin commented on a change in pull request #8776: [FLINK-12881][ml] Add more functionalities for ML Params and ParamInfo class URL: https://github.com/apache/flink/pull/8776#discussion_r296524614 ## File path: flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/misc/param/Params.java ## @@ -93,18 +148,20 @@ * @param the type of the specific parameter */ public void remove(ParamInfo info) { - paramMap.remove(info.getName()); + params.remove(info.getName()); + for (String a : info.getAlias()) { + params.remove(a); + } } /** -* Creates and returns a deep clone of this Params. +* Returns true if this params has the specified paramInfo. * -* @return a deep clone of this Params +* @return true if this params has the specified paramInfo. */ - public Params clone() { - Params newParams = new Params(); - newParams.paramMap.putAll(this.paramMap); - return newParams; + public boolean contains(ParamInfo paramInfo) { + return params.containsKey(paramInfo.getName()) || Review comment: This behavior is different from the java doc. The java doc in `ParamInfo` said the following: "when isOptional is true, contain(ParamInfo) is true, it will return the value found in Params, ..." This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] becketqin commented on a change in pull request #8776: [FLINK-12881][ml] Add more functionalities for ML Params and ParamInfo class
becketqin commented on a change in pull request #8776: [FLINK-12881][ml] Add more functionalities for ML Params and ParamInfo class URL: https://github.com/apache/flink/pull/8776#discussion_r296523650 ## File path: flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/misc/param/Params.java ## @@ -93,18 +148,20 @@ * @param the type of the specific parameter */ public void remove(ParamInfo info) { - paramMap.remove(info.getName()); + params.remove(info.getName()); + for (String a : info.getAlias()) { + params.remove(a); + } } /** -* Creates and returns a deep clone of this Params. +* Returns true if this params has the specified paramInfo. Review comment: I moved some of the java doc from `ParamInfo` here. See if it is clearer. ``` /** * Check whether a value is set for the given {@code paramInfo}. A value is considered set in * the following cases: * * * The given parameter has a value set in this Params, or * * * The given parameter is optional and has a default value defined in the {@code paramInfo} * * * * If this method returns false, {@link #get(ParamInfo)} will throw an exception. * * @return true if this params has a value set for the specified {@code paramInfo}, false otherwise. */ ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] becketqin commented on a change in pull request #8776: [FLINK-12881][ml] Add more functionalities for ML Params and ParamInfo class
becketqin commented on a change in pull request #8776: [FLINK-12881][ml] Add more functionalities for ML Params and ParamInfo class URL: https://github.com/apache/flink/pull/8776#discussion_r296518189 ## File path: flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/misc/param/Params.java ## @@ -93,18 +148,20 @@ * @param the type of the specific parameter */ public void remove(ParamInfo info) { - paramMap.remove(info.getName()); + params.remove(info.getName()); + for (String a : info.getAlias()) { + params.remove(a); + } } /** -* Creates and returns a deep clone of this Params. +* Returns true if this params has the specified paramInfo. * -* @return a deep clone of this Params +* @return true if this params has the specified paramInfo. */ - public Params clone() { - Params newParams = new Params(); - newParams.paramMap.putAll(this.paramMap); - return newParams; + public boolean contains(ParamInfo paramInfo) { Review comment: The argument name is `info` everywhere else. Can we just use `info` instead of `paramInfo` here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tzulitai commented on a change in pull request #8618: [FLINK-12732][state-processor-api] Add savepoint reader for consuming partitioned operator state
tzulitai commented on a change in pull request #8618: [FLINK-12732][state-processor-api] Add savepoint reader for consuming partitioned operator state URL: https://github.com/apache/flink/pull/8618#discussion_r296549276 ## File path: flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/VoidTriggerable.java ## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.api.runtime; + +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.Triggerable; + +/** + * A {@link Triggerable} that does nothing. + * + * @param Type of the keys to which timers are scoped. + * @param Type of the namespace to which timers are scoped. + */ +public final class VoidTriggerable implements Triggerable { Review comment: Add `@Internal`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tzulitai commented on a change in pull request #8618: [FLINK-12732][state-processor-api] Add savepoint reader for consuming partitioned operator state
tzulitai commented on a change in pull request #8618: [FLINK-12732][state-processor-api] Add savepoint reader for consuming partitioned operator state URL: https://github.com/apache/flink/pull/8618#discussion_r296548435 ## File path: flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/BufferingCollector.java ## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.api.input; + +import org.apache.flink.util.Collector; + +import java.util.ArrayDeque; +import java.util.Iterator; +import java.util.Queue; + +/** + * A collector that buffers elements in memory to be pulled latter by downstream operators. Review comment: typo: `latter` -> `later` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tzulitai commented on a change in pull request #8618: [FLINK-12732][state-processor-api] Add savepoint reader for consuming partitioned operator state
tzulitai commented on a change in pull request #8618: [FLINK-12732][state-processor-api] Add savepoint reader for consuming partitioned operator state URL: https://github.com/apache/flink/pull/8618#discussion_r296548675 ## File path: flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/KeyedStateInputFormat.java ## @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.api.input; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.runtime.checkpoint.OperatorState; +import org.apache.flink.runtime.checkpoint.StateAssignmentOperation; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.DefaultKeyedStateStore; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.state.api.functions.KeyedStateReaderFunction; +import org.apache.flink.state.api.input.splits.KeyGroupRangeInputSplit; +import org.apache.flink.state.api.runtime.NeverFireProcessingTimeService; +import org.apache.flink.state.api.runtime.SavepointEnvironment; +import org.apache.flink.state.api.runtime.SavepointRuntimeContext; +import org.apache.flink.streaming.api.operators.KeyContext; +import org.apache.flink.streaming.api.operators.StreamOperatorStateContext; +import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer; +import org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl; +import org.apache.flink.util.CollectionUtil; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; + +/** + * Input format for reading partitioned state. + * + * @param The type of the key. + * @param The type of the output of the {@link KeyedStateReaderFunction}. + */ +@Internal +public class KeyedStateInputFormat extends SavepointInputFormat implements KeyContext { + + private static final String USER_TIMERS_NAME = "user-timers"; + + private final StateBackend stateBackend; + + private final TypeInformation keyType; + + private final KeyedStateReaderFunction userFunction; + + private transient TypeSerializer keySerializer; + + private transient CloseableRegistry registry; + + private transient BufferingCollector out; + + private transient Iterator keys; + + private transient AbstractKeyedStateBackend keyedStateBackend; + + private transient Context ctx; + + /** +* Creates an input format for reading partitioned state from an operator in a savepoint. +* +* @param savepointPath The path to an existing savepoint. +* @param uid The uid of an operator. +* @param stateBackend The state backed used to snapshot the operator. +* @param keyType The type information describing the key type. +* @param userFunction The {@link KeyedStateReaderFunction} called for each key in the operator. +*/ + public KeyedStateInputFormat( + String savepointPath, + String uid, + StateBackend stateBackend, + TypeInformation keyType, + KeyedStateReaderFunction userFunction) { + super(savepointPath, uid); + this.stateBackend = stateBackend; + this.keyType = keyType; + this.userFunction = userFunction; + } + + @Override + public KeyGroupRangeInputSplit[] createInputSplits(int minNumSplits) throws
[GitHub] [flink] tzulitai commented on a change in pull request #8618: [FLINK-12732][state-processor-api] Add savepoint reader for consuming partitioned operator state
tzulitai commented on a change in pull request #8618: [FLINK-12732][state-processor-api] Add savepoint reader for consuming partitioned operator state URL: https://github.com/apache/flink/pull/8618#discussion_r296548546 ## File path: flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/KeyedStateInputFormat.java ## @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.api.input; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.runtime.checkpoint.OperatorState; +import org.apache.flink.runtime.checkpoint.StateAssignmentOperation; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.DefaultKeyedStateStore; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.state.api.functions.KeyedStateReaderFunction; +import org.apache.flink.state.api.input.splits.KeyGroupRangeInputSplit; +import org.apache.flink.state.api.runtime.NeverFireProcessingTimeService; +import org.apache.flink.state.api.runtime.SavepointEnvironment; +import org.apache.flink.state.api.runtime.SavepointRuntimeContext; +import org.apache.flink.streaming.api.operators.KeyContext; +import org.apache.flink.streaming.api.operators.StreamOperatorStateContext; +import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer; +import org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl; +import org.apache.flink.util.CollectionUtil; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; + +/** + * Input format for reading partitioned state. + * + * @param The type of the key. + * @param The type of the output of the {@link KeyedStateReaderFunction}. + */ +@Internal +public class KeyedStateInputFormat extends SavepointInputFormat implements KeyContext { + + private static final String USER_TIMERS_NAME = "user-timers"; + + private final StateBackend stateBackend; + + private final TypeInformation keyType; + + private final KeyedStateReaderFunction userFunction; + + private transient TypeSerializer keySerializer; + + private transient CloseableRegistry registry; + + private transient BufferingCollector out; + + private transient Iterator keys; + + private transient AbstractKeyedStateBackend keyedStateBackend; + + private transient Context ctx; + + /** +* Creates an input format for reading partitioned state from an operator in a savepoint. +* +* @param savepointPath The path to an existing savepoint. +* @param uid The uid of an operator. +* @param stateBackend The state backed used to snapshot the operator. +* @param keyType The type information describing the key type. +* @param userFunction The {@link KeyedStateReaderFunction} called for each key in the operator. +*/ + public KeyedStateInputFormat( + String savepointPath, + String uid, + StateBackend stateBackend, + TypeInformation keyType, + KeyedStateReaderFunction userFunction) { + super(savepointPath, uid); + this.stateBackend = stateBackend; + this.keyType = keyType; + this.userFunction = userFunction; + } + + @Override + public KeyGroupRangeInputSplit[] createInputSplits(int minNumSplits) throws
[GitHub] [flink] tzulitai commented on a change in pull request #8618: [FLINK-12732][state-processor-api] Add savepoint reader for consuming partitioned operator state
tzulitai commented on a change in pull request #8618: [FLINK-12732][state-processor-api] Add savepoint reader for consuming partitioned operator state URL: https://github.com/apache/flink/pull/8618#discussion_r296548329 ## File path: flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/KeyedStateReaderFunction.java ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.api.functions; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.Collector; + +/** + * A function that processes keys from a restored operator + * + * For every key {@link #readKey(Object, Context, Collector)} is invoked. This can produce zero or more + * elements as output. + * + * NOTE: State descriptors must be eagerly registered in {@code open(Configuration)}. Any + * attempt to dynamically register states inside of {@code readKey} will result in a {@code + * RuntimeException}. + * + * NOTE: A {@code KeyedStateReaderFunction} is always a {@link + * org.apache.flink.api.common.functions.RichFunction}. Therefore, access to the {@link + * org.apache.flink.api.common.functions.RuntimeContext} is always available and setup and teardown + * methods can be implemented. See {@link + * org.apache.flink.api.common.functions.RichFunction#open(Configuration)} and {@link + * org.apache.flink.api.common.functions.RichFunction#close()}. + * + * @param Type of the keys + * @param Type of the output elements. + */ +@PublicEvolving +public abstract class KeyedStateReaderFunction extends AbstractRichFunction { + + /** +* Initialization method for the function. It is called before {@link #readKey(Object, +* Context, Collector)} and thus suitable for one time setup work. +* +* This is the only method that my register state descriptors within a {@code Review comment: typo: only method that `may` ... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tzulitai commented on a change in pull request #8618: [FLINK-12732][state-processor-api] Add savepoint reader for consuming partitioned operator state
tzulitai commented on a change in pull request #8618: [FLINK-12732][state-processor-api] Add savepoint reader for consuming partitioned operator state URL: https://github.com/apache/flink/pull/8618#discussion_r296549170 ## File path: flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/KeyedStateInputFormat.java ## @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.api.input; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.runtime.checkpoint.OperatorState; +import org.apache.flink.runtime.checkpoint.StateAssignmentOperation; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.DefaultKeyedStateStore; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.state.api.functions.KeyedStateReaderFunction; +import org.apache.flink.state.api.input.splits.KeyGroupRangeInputSplit; +import org.apache.flink.state.api.runtime.NeverFireProcessingTimeService; +import org.apache.flink.state.api.runtime.SavepointEnvironment; +import org.apache.flink.state.api.runtime.SavepointRuntimeContext; +import org.apache.flink.streaming.api.operators.KeyContext; +import org.apache.flink.streaming.api.operators.StreamOperatorStateContext; +import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer; +import org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl; +import org.apache.flink.util.CollectionUtil; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; + +/** + * Input format for reading partitioned state. + * + * @param The type of the key. + * @param The type of the output of the {@link KeyedStateReaderFunction}. + */ +@Internal +public class KeyedStateInputFormat extends SavepointInputFormat implements KeyContext { + + private static final String USER_TIMERS_NAME = "user-timers"; + + private final StateBackend stateBackend; + + private final TypeInformation keyType; + + private final KeyedStateReaderFunction userFunction; + + private transient TypeSerializer keySerializer; + + private transient CloseableRegistry registry; + + private transient BufferingCollector out; + + private transient Iterator keys; + + private transient AbstractKeyedStateBackend keyedStateBackend; + + private transient Context ctx; + + /** +* Creates an input format for reading partitioned state from an operator in a savepoint. +* +* @param savepointPath The path to an existing savepoint. +* @param uid The uid of an operator. +* @param stateBackend The state backed used to snapshot the operator. +* @param keyType The type information describing the key type. +* @param userFunction The {@link KeyedStateReaderFunction} called for each key in the operator. +*/ + public KeyedStateInputFormat( + String savepointPath, + String uid, + StateBackend stateBackend, + TypeInformation keyType, + KeyedStateReaderFunction userFunction) { + super(savepointPath, uid); + this.stateBackend = stateBackend; + this.keyType = keyType; + this.userFunction = userFunction; + } + + @Override + public KeyGroupRangeInputSplit[] createInputSplits(int minNumSplits) throws
[GitHub] [flink] gaoyunhaii commented on issue #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on issue #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#issuecomment-504851196 Hi @NicoK , sorry for keeping occupied by some urgent internal affairs last week. I have split the original commit into four small ones and rebased them on the latest master. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] chancelq commented on a change in pull request #7550: [FLINK-10245] [Streaming Connector] Add Pojo, Tuple, Row and Scala Product DataStream Sink and Upsert Table Sink for HBase
chancelq commented on a change in pull request #7550: [FLINK-10245] [Streaming Connector] Add Pojo, Tuple, Row and Scala Product DataStream Sink and Upsert Table Sink for HBase URL: https://github.com/apache/flink/pull/7550#discussion_r296544775 ## File path: flink-connectors/flink-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/util/HBaseUtils.java ## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.hbase.util; + +import org.apache.flink.api.common.typeinfo.TypeInformation; + +import org.apache.hadoop.hbase.util.Bytes; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; + +/** + * Suite of utility methods for HBase. + */ +public class HBaseUtils { + + public static byte[] serialize(TypeInformation typeInfo, Object obj) { + Class clazz = typeInfo.getTypeClass(); + if (byte[].class.equals(clazz)) { + return (byte[]) obj; + } else if (String.class.equals(clazz)) { + return Bytes.toBytes((String) obj); Review comment: it should need to support charset. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] chancelq commented on a change in pull request #7550: [FLINK-10245] [Streaming Connector] Add Pojo, Tuple, Row and Scala Product DataStream Sink and Upsert Table Sink for HBase
chancelq commented on a change in pull request #7550: [FLINK-10245] [Streaming Connector] Add Pojo, Tuple, Row and Scala Product DataStream Sink and Upsert Table Sink for HBase URL: https://github.com/apache/flink/pull/7550#discussion_r296544775 ## File path: flink-connectors/flink-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/util/HBaseUtils.java ## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.hbase.util; + +import org.apache.flink.api.common.typeinfo.TypeInformation; + +import org.apache.hadoop.hbase.util.Bytes; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; + +/** + * Suite of utility methods for HBase. + */ +public class HBaseUtils { + + public static byte[] serialize(TypeInformation typeInfo, Object obj) { + Class clazz = typeInfo.getTypeClass(); + if (byte[].class.equals(clazz)) { + return (byte[]) obj; + } else if (String.class.equals(clazz)) { + return Bytes.toBytes((String) obj); Review comment: it should support charset. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] chancelq commented on a change in pull request #7550: [FLINK-10245] [Streaming Connector] Add Pojo, Tuple, Row and Scala Product DataStream Sink and Upsert Table Sink for HBase
chancelq commented on a change in pull request #7550: [FLINK-10245] [Streaming Connector] Add Pojo, Tuple, Row and Scala Product DataStream Sink and Upsert Table Sink for HBase URL: https://github.com/apache/flink/pull/7550#discussion_r296544664 ## File path: flink-connectors/flink-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseUpsertTableSinkFactory.java ## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.hbase; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.table.descriptors.HBaseValidator; +import org.apache.flink.table.descriptors.SchemaValidator; +import org.apache.flink.table.descriptors.StreamTableDescriptorValidator; +import org.apache.flink.table.factories.StreamTableSinkFactory; +import org.apache.flink.table.sinks.StreamTableSink; +import org.apache.flink.types.Row; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION; +import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE; +import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_CF_QUALIFIER_DELIMITER; +import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_CLUSTER_KEY; +import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_ROW_KEY; +import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_TABLE_NAME; +import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_TYPE_VALUE_HBASE; +import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA; +import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_NAME; +import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_TYPE; +import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE; +import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_APPEND; + +/** + * Table factory for creating an {@link org.apache.flink.table.sinks.UpsertStreamTableSink} for HBase. + */ +@Internal +public class HBaseUpsertTableSinkFactory implements StreamTableSinkFactory> { + + public static final String DEFAULT_CF_QUALIFIER_DELIMITER = ":"; + + @Override public StreamTableSink> createStreamTableSink(Map properties) { + final DescriptorProperties descriptorProperties = getValidatedProperties(properties); + final String delimiter = descriptorProperties.getOptionalString(CONNECTOR_CF_QUALIFIER_DELIMITER) + .orElse(DEFAULT_CF_QUALIFIER_DELIMITER); + return new HBaseUpsertTableSink( + descriptorProperties.isValue(UPDATE_MODE(), UPDATE_MODE_VALUE_APPEND()), + descriptorProperties.getTableSchema(SCHEMA()), + descriptorProperties.getString(CONNECTOR_CLUSTER_KEY), + descriptorProperties.getString(CONNECTOR_TABLE_NAME), + getUserConfig(descriptorProperties), + descriptorProperties.getString(CONNECTOR_ROW_KEY), + delimiter + ); + } + + @Override public Map requiredContext() { + final Map context = new HashMap<>(); + context.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_HBASE); + context.put(CONNECTOR_PROPERTY_VERSION, "1"); + return context; + } + + @Override public List supportedProperties() { + final List properties = new ArrayList<>(); + + // streaming properties + properties.add(UPDATE_MODE()); Review comment: is () a typo? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment.
[GitHub] [flink] chancelq commented on a change in pull request #7550: [FLINK-10245] [Streaming Connector] Add Pojo, Tuple, Row and Scala Product DataStream Sink and Upsert Table Sink for HBase
chancelq commented on a change in pull request #7550: [FLINK-10245] [Streaming Connector] Add Pojo, Tuple, Row and Scala Product DataStream Sink and Upsert Table Sink for HBase URL: https://github.com/apache/flink/pull/7550#discussion_r296544376 ## File path: flink-connectors/flink-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseSinkFunctionBase.java ## @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.hbase; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.connectors.hbase.util.HBaseUtils; + +import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * HBaseSinkFunctionBase is the common abstract class of {@link HBasePojoSinkFunction}, {@link HBaseTupleSinkFunction}, + * {@link HBaseScalaProductSinkFunction} and {@link HBaseUpsertTableSink.HBaseUpsertSinkFunction}. + * + * @param Type of the elements emitted by this sink + */ +public abstract class HBaseSinkFunctionBase extends RichSinkFunction implements CheckpointedFunction { + + private static final Logger log = LoggerFactory.getLogger(HBaseSinkFunctionBase.class); + + // + // Internal bulk processor configuration + // + + public static final String CONFIG_KEY_BATCH_FLUSH_ENABLE = "connector.batch-flush.enable"; + public static final String CONFIG_KEY_BATCH_FLUSH_MAX_MUTATIONS = "connector.batch-flush.max-mutations"; + public static final String CONFIG_KEY_BATCH_FLUSH_MAX_SIZE_MB = "connector.batch-flush.max-size.mb"; + public static final String CONFIG_KEY_BATCH_FLUSH_INTERVAL_MS = "connector.batch-flush.interval.ms"; + public static final String CONFIG_KEY_BATCH_FLUSH_MAX_RETRIES = "connector.batch-flush.backoff.max-retries"; + public static final String CONFIG_KEY_BATCH_FLUSH_MAX_TIMEOUT_MS = "connector.batch-flush.max-timeout"; + + private final Map userConfig; + + protected final int rowKeyIndex; + protected final String[] fieldNames; + protected final TypeInformation[] fieldTypes; + protected final String[] columnFamilies; + protected final String[] qualifiers; + protected final int[] fieldElementIndexMapping; + + /** The timer that triggers periodic flush to HBase. */ + private ScheduledThreadPoolExecutor executor; + + private ExecutorService flushExecutor; + + /** The lock to safeguard the flush commits. */ + private transient Object lock; + + private Connection connection; + private transient Table hTable; + private boolean isBuildingTable = false; + + private HBaseClientWrapper client; + private List mutaionBuffer = new LinkedList<>(); + private long estimateSize = 0; + + private final boolean batchFlushEnable; + private long
[GitHub] [flink] chancelq commented on a change in pull request #7550: [FLINK-10245] [Streaming Connector] Add Pojo, Tuple, Row and Scala Product DataStream Sink and Upsert Table Sink for HBase
chancelq commented on a change in pull request #7550: [FLINK-10245] [Streaming Connector] Add Pojo, Tuple, Row and Scala Product DataStream Sink and Upsert Table Sink for HBase URL: https://github.com/apache/flink/pull/7550#discussion_r296543945 ## File path: flink-connectors/flink-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseClientWrapper.java ## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.hbase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Table; + +import java.io.IOException; +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +/** + * This class is used to configure a {@link Connection} and a {@link Table} after deployment. + * The connection represents the connection that will be established to HBase. + * The table represents a table can be manipulated in the hbase. + */ +public class HBaseClientWrapper implements Serializable { + + private Map configurationMap = new HashMap<>(); Review comment: I think It should be necessary to support loading properties from hbase-site.xml This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tzulitai commented on a change in pull request #8615: [FLINK-12729][state-processor-api] Add state reader for consuming non-partitioned operator state
tzulitai commented on a change in pull request #8615: [FLINK-12729][state-processor-api] Add state reader for consuming non-partitioned operator state URL: https://github.com/apache/flink/pull/8615#discussion_r296537584 ## File path: flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/ExistingSavepoint.java ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.api; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.state.api.input.BroadcastStateInputFormat; +import org.apache.flink.state.api.input.ListStateInputFormat; +import org.apache.flink.state.api.input.UnionStateInputFormat; + +/** + * An existing savepoint. + */ +@PublicEvolving +@SuppressWarnings("WeakerAccess") +public class ExistingSavepoint { + private final ExecutionEnvironment env; + + private final String existingSavepoint; + + private final StateBackend stateBackend; + + ExistingSavepoint(ExecutionEnvironment env, String path, StateBackend stateBackend) { + this.env = env; + this.existingSavepoint = path; + this.stateBackend = stateBackend; Review comment: nit: missing `Preconditions.checkNotNull` checks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tzulitai commented on a change in pull request #8615: [FLINK-12729][state-processor-api] Add state reader for consuming non-partitioned operator state
tzulitai commented on a change in pull request #8615: [FLINK-12729][state-processor-api] Add state reader for consuming non-partitioned operator state URL: https://github.com/apache/flink/pull/8615#discussion_r296539566 ## File path: flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/SavepointInputFormat.java ## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.api.input; + +import org.apache.flink.api.common.io.DefaultInputSplitAssigner; +import org.apache.flink.api.common.io.RichInputFormat; +import org.apache.flink.api.common.io.statistics.BaseStatistics; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.core.io.InputSplitAssigner; +import org.apache.flink.runtime.checkpoint.OperatorState; +import org.apache.flink.runtime.checkpoint.savepoint.Savepoint; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.state.api.runtime.OperatorIDGenerator; +import org.apache.flink.state.api.runtime.SavepointLoader; + +import java.io.IOException; + +/** + * Base input format for reading state form {@link Savepoint}'s. + * + * @param The type of the produced records. + * @param The type of input split. + */ +abstract class SavepointInputFormat extends RichInputFormat { + private final String savepointPath; + + protected final String uid; + + @SuppressWarnings("WeakerAccess") + protected final OperatorID operatorID; + + SavepointInputFormat(String savepointPath, String uid) { + this.savepointPath = savepointPath; + this.uid = uid; + this.operatorID = OperatorIDGenerator.fromUid(uid); + } + + @Override + public void configure(Configuration parameters) { + + } + + @Override + public BaseStatistics getStatistics(BaseStatistics cachedStatistics) { + return cachedStatistics; + } + + @Override + public InputSplitAssigner getInputSplitAssigner(T[] inputSplits) { + return new DefaultInputSplitAssigner(inputSplits); + } + + /** +* Finds the {@link OperatorState} for a uid within a savepoint. +* +* @return A handle to the operator state in the savepoint with the provided uid. +* @throws IOException If the savepoint path is invalid or the uid does not exist +*/ + OperatorState getOperatorState() throws IOException { + final Savepoint savepoint = SavepointLoader.loadSavepoint(savepointPath); Review comment: I've been thinking whether or not this operation can be done more earlier when loading a `ExistingSavepoint`, and maintained by the `ExistingSavepoint` class. Not entirely sure though, as that would require access to the DFS on the client side; not sure how feasible that is in practice. Not too much of a deal, as this shouldn't be a heavy workload. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tzulitai commented on a change in pull request #8615: [FLINK-12729][state-processor-api] Add state reader for consuming non-partitioned operator state
tzulitai commented on a change in pull request #8615: [FLINK-12729][state-processor-api] Add state reader for consuming non-partitioned operator state URL: https://github.com/apache/flink/pull/8615#discussion_r296539972 ## File path: flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointLoader.java ## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.api.runtime; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.checkpoint.Checkpoints; +import org.apache.flink.runtime.checkpoint.savepoint.Savepoint; +import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation; +import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage; + +import java.io.DataInputStream; +import java.io.IOException; + +/** + * Utility class for loading {@link Savepoint} metadata. + */ +@Internal +public final class SavepointLoader { + private SavepointLoader() {} + + /** +* Takes the given string (representing a pointer to a checkpoint) and resolves it to a file +* status for the checkpoint's metadata file. +* +* @param savepointPath The path to an external savepoint. +* @return A state handle to savepoint's metadata. +* @throws IOException Thrown, if the path cannot be resolved, the file system not accessed, or +* the path points to a location that does not seem to be a savepoint. +*/ + public static Savepoint loadSavepoint(String savepointPath) throws IOException { + CompletedCheckpointStorageLocation location = AbstractFsCheckpointStorage + .resolveCheckpointPointer(savepointPath); + + try (DataInputStream stream = new DataInputStream(location.getMetadataHandle().openInputStream())) { + return Checkpoints.loadCheckpointMetadata(stream, NullClassLoader.INSTANCE); Review comment: In the future, once we add type information of state into the savepoint metadata file, I don't think this `NullClassLoader` is correct anymore. We would potentially need the actual user classloader, since the metadata would contain user classes (e.g. user implemented `TypeSerializerSnapshot`s) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tzulitai commented on a change in pull request #8615: [FLINK-12729][state-processor-api] Add state reader for consuming non-partitioned operator state
tzulitai commented on a change in pull request #8615: [FLINK-12729][state-processor-api] Add state reader for consuming non-partitioned operator state URL: https://github.com/apache/flink/pull/8615#discussion_r296538634 ## File path: flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/OperatorStateInputFormat.java ## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.api.input; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.runtime.checkpoint.OperatorState; +import org.apache.flink.runtime.checkpoint.StateAssignmentOperation; +import org.apache.flink.runtime.checkpoint.StateObjectCollection; +import org.apache.flink.runtime.jobgraph.OperatorInstanceID; +import org.apache.flink.runtime.state.BackendBuildingException; +import org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder; +import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.state.api.input.splits.OperatorStateInputSplit; +import org.apache.flink.streaming.api.operators.BackendRestorerProcedure; +import org.apache.flink.util.CollectionUtil; +import org.apache.flink.util.IOUtils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * The base input format for reading operator state from a {@link + * org.apache.flink.runtime.checkpoint.savepoint.Savepoint}. + * + * @param The type of the input. + */ +abstract class OperatorStateInputFormat extends SavepointInputFormat { Review comment: `@Internal` annotation This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tzulitai commented on a change in pull request #8615: [FLINK-12729][state-processor-api] Add state reader for consuming non-partitioned operator state
tzulitai commented on a change in pull request #8615: [FLINK-12729][state-processor-api] Add state reader for consuming non-partitioned operator state URL: https://github.com/apache/flink/pull/8615#discussion_r296540299 ## File path: flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/NullClassLoader.java ## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.api.runtime; + +import java.io.InputStream; +import java.net.URL; +import java.util.Enumeration; + +/** + * A classloader that does not work. + */ +public class NullClassLoader extends ClassLoader { Review comment: Add `@Internal`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tzulitai commented on a change in pull request #8615: [FLINK-12729][state-processor-api] Add state reader for consuming non-partitioned operator state
tzulitai commented on a change in pull request #8615: [FLINK-12729][state-processor-api] Add state reader for consuming non-partitioned operator state URL: https://github.com/apache/flink/pull/8615#discussion_r296538424 ## File path: flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/BroadcastStateInputFormat.java ## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.api.input; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.OperatorStateBackend; + +import java.util.Map; +import java.util.stream.StreamSupport; + +/** + * The input format for reading {@link org.apache.flink.api.common.state.BroadcastState}. + * + * @param The type of the keys in the {@code BroadcastState}. + * @param The type of the values in the {@code BroadcastState}. + */ +@Internal +public class BroadcastStateInputFormat extends OperatorStateInputFormat> { + private final MapStateDescriptor descriptor; + + /** +* Creates an input format for reading broadcast state from an operator in a savepoint. +* +* @param savepointPath The path to an existing savepoint. +* @param uid The uid of a particular operator. +* @param descriptor The descriptor for this state, providing a name and serializer. +*/ + public BroadcastStateInputFormat(String savepointPath, String uid, MapStateDescriptor descriptor) { + super(savepointPath, uid, true); + this.descriptor = descriptor; Review comment: Please check all other constructors of this problem, I think the issue occurs in multiple constructors. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tzulitai commented on a change in pull request #8615: [FLINK-12729][state-processor-api] Add state reader for consuming non-partitioned operator state
tzulitai commented on a change in pull request #8615: [FLINK-12729][state-processor-api] Add state reader for consuming non-partitioned operator state URL: https://github.com/apache/flink/pull/8615#discussion_r296537903 ## File path: flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/ExistingSavepoint.java ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.api; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.state.api.input.BroadcastStateInputFormat; +import org.apache.flink.state.api.input.ListStateInputFormat; +import org.apache.flink.state.api.input.UnionStateInputFormat; + +/** + * An existing savepoint. + */ +@PublicEvolving +@SuppressWarnings("WeakerAccess") +public class ExistingSavepoint { + private final ExecutionEnvironment env; + + private final String existingSavepoint; + + private final StateBackend stateBackend; + + ExistingSavepoint(ExecutionEnvironment env, String path, StateBackend stateBackend) { + this.env = env; + this.existingSavepoint = path; + this.stateBackend = stateBackend; + } + + /** +* Read operator {@code ListState} from a {@code Savepoint}. +* @param uid The uid of the operator. +* @param name The (unique) name for the state. +* @param typeInfo The type of the elements in the state. +* @param The type of the values that are in the list state. +* @return A {@code DataSet} representing the elements in state. +*/ + public DataSet readListState(String uid, String name, TypeInformation typeInfo) { + ListStateDescriptor descriptor = new ListStateDescriptor<>(name, typeInfo); + ListStateInputFormat inputFormat = new ListStateInputFormat<>(existingSavepoint, uid, descriptor); + return env.createInput(inputFormat, typeInfo); + } + + /** +* Read operator {@code ListState} from a {@code Savepoint} when a +* custom serializer was used; e.g., a different serializer than the +* one returned by {@code TypeInformation#createSerializer}. +* @param uid The uid of the operator. +* @param name The (unique) name for the state. +* @param typeInfo The type of the elements in the state. +* @param serializer The serializer used to write the elements into state. +* @param The type of the values that are in the list state. +* @return A {@code DataSet} representing the elements in state. +*/ + public DataSet readListState( + String uid, + String name, + TypeInformation typeInfo, + TypeSerializer serializer) { Review comment: As a follow-up, we can probably think about a variant where the user simply passes in a `TypeSerializer` and no `TypeInformation`. In this case, can we just wrap the given serializer into a "dummy" type info? Not entirely sure what methods of the `TypeInformation` will be used in the batch processing API. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tzulitai commented on a change in pull request #8615: [FLINK-12729][state-processor-api] Add state reader for consuming non-partitioned operator state
tzulitai commented on a change in pull request #8615: [FLINK-12729][state-processor-api] Add state reader for consuming non-partitioned operator state URL: https://github.com/apache/flink/pull/8615#discussion_r296539201 ## File path: flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/OperatorStateInputFormat.java ## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.api.input; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.runtime.checkpoint.OperatorState; +import org.apache.flink.runtime.checkpoint.StateAssignmentOperation; +import org.apache.flink.runtime.checkpoint.StateObjectCollection; +import org.apache.flink.runtime.jobgraph.OperatorInstanceID; +import org.apache.flink.runtime.state.BackendBuildingException; +import org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder; +import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.state.api.input.splits.OperatorStateInputSplit; +import org.apache.flink.streaming.api.operators.BackendRestorerProcedure; +import org.apache.flink.util.CollectionUtil; +import org.apache.flink.util.IOUtils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * The base input format for reading operator state from a {@link + * org.apache.flink.runtime.checkpoint.savepoint.Savepoint}. + * + * @param The type of the input. + */ +abstract class OperatorStateInputFormat extends SavepointInputFormat { + + private final boolean isUnionType; + + private transient OperatorStateBackend restoredBackend; + + private transient CloseableRegistry registry; + + private transient Iterator elements; + + OperatorStateInputFormat(String savepointPath, String uid, boolean isUnionType) { + super(savepointPath, uid); + + this.isUnionType = isUnionType; + } + + protected abstract Iterable getElements(OperatorStateBackend restoredBackend) throws Exception; + + public OperatorStateInputSplit[] createInputSplits(int minNumSplits) throws IOException { + OperatorStateInputSplit[] splits = getOperatorStateInputSplits(minNumSplits); + + if (isUnionType) { + return subPartitionSingleSplit(minNumSplits, splits); + } else { + return splits; + } + } + + private OperatorStateInputSplit[] subPartitionSingleSplit(int minNumSplits, OperatorStateInputSplit[] splits) { + if (splits.length == 0) { + return splits; + } + + // We only want to output a single instance of the union state so we only need + // to transform a single input split. An arbitrary split is chosen and + // sub-partitioned for better data distribution across the cluster. + return CollectionUtil.mapWithIndex( + CollectionUtil.partition(splits[0].getPrioritizedManagedOperatorState().get(0).asList(), minNumSplits), + (state, index) -> new OperatorStateInputSplit(new StateObjectCollection<>(new ArrayList<>(state)), index) + ).toArray(OperatorStateInputSplit[]::new); + } + + private OperatorStateInputSplit[] getOperatorStateInputSplits(int minNumSplits) throws IOException { + final OperatorState operatorState = getOperatorState(); + + final Map> newManagedOperatorStates = new HashMap<>(); + final Map> newRawOperatorStates = new HashMap<>(); Review comment: nit: maybe add an "unused" to the variable naming, to indicate that this isn't used at all. This is an automated message from the Apache Git Service. To respond to the
[GitHub] [flink] tzulitai commented on a change in pull request #8615: [FLINK-12729][state-processor-api] Add state reader for consuming non-partitioned operator state
tzulitai commented on a change in pull request #8615: [FLINK-12729][state-processor-api] Add state reader for consuming non-partitioned operator state URL: https://github.com/apache/flink/pull/8615#discussion_r290665994 ## File path: flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/Savepoint.java ## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.api; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.runtime.state.StateBackend; + +/** + * A {@link Savepoint} is a collection of operator states that can be used to supply initial state + * when starting a {@link org.apache.flink.streaming.api.datastream.DataStream} job. + */ +@PublicEvolving +public final class Savepoint { + + private Savepoint() {} + + /** +* Loads an existing savepoint. Useful if you want to query, modify, or extend +* the state of an existing application. +* +* @param env The execution enviornment used to transform the savepoint. +* @param path The path to an existing savepoint on disk. +* @param stateBackend The state backend of the savepoint used for keyed state. +*/ + public static ExistingSavepoint load(ExecutionEnvironment env, String path, StateBackend stateBackend) { Review comment: Missing `@return` in method Javadoc. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tzulitai commented on a change in pull request #8615: [FLINK-12729][state-processor-api] Add state reader for consuming non-partitioned operator state
tzulitai commented on a change in pull request #8615: [FLINK-12729][state-processor-api] Add state reader for consuming non-partitioned operator state URL: https://github.com/apache/flink/pull/8615#discussion_r296537994 ## File path: flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/BroadcastStateInputFormat.java ## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.api.input; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.OperatorStateBackend; + +import java.util.Map; +import java.util.stream.StreamSupport; + +/** + * The input format for reading {@link org.apache.flink.api.common.state.BroadcastState}. + * + * @param The type of the keys in the {@code BroadcastState}. + * @param The type of the values in the {@code BroadcastState}. + */ +@Internal +public class BroadcastStateInputFormat extends OperatorStateInputFormat> { + private final MapStateDescriptor descriptor; + + /** +* Creates an input format for reading broadcast state from an operator in a savepoint. +* +* @param savepointPath The path to an existing savepoint. +* @param uid The uid of a particular operator. +* @param descriptor The descriptor for this state, providing a name and serializer. +*/ + public BroadcastStateInputFormat(String savepointPath, String uid, MapStateDescriptor descriptor) { + super(savepointPath, uid, true); + this.descriptor = descriptor; Review comment: nit: missing `Preconditions.checkNotNull()` check. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-11560) Translate "Flink Applications" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-11560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu closed FLINK-11560. --- Resolution: Fixed Fix Version/s: 1.9.0 Fixed in flink-web: 7e6322b301dd8eb7344cd315354d76df28cc9eb8 > Translate "Flink Applications" page into Chinese > > > Key: FLINK-11560 > URL: https://issues.apache.org/jira/browse/FLINK-11560 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Project Website >Reporter: Jark Wu >Assignee: Zhou Yumin >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 0.5h > Remaining Estimate: 0h > > Translate "Flink Applications" page into Chinese. > The markdown file is located in: flink-web/flink-applications.zh.md > The url link is: https://flink.apache.org/zh/flink-applications.html > Please adjust the links in the page to Chinese pages when translating. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] flinkbot commented on issue #8846: [FLINK-12761][runtime] Dynamically allocate TaskExecutor's managed memory to slots.
flinkbot commented on issue #8846: [FLINK-12761][runtime] Dynamically allocate TaskExecutor's managed memory to slots. URL: https://github.com/apache/flink/pull/8846#issuecomment-504841622 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12761) Fine grained resource management
[ https://issues.apache.org/jira/browse/FLINK-12761?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12761: --- Labels: Umbrella pull-request-available (was: Umbrella) > Fine grained resource management > > > Key: FLINK-12761 > URL: https://issues.apache.org/jira/browse/FLINK-12761 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.8.0, 1.9.0 >Reporter: Xintong Song >Assignee: Xintong Song >Priority: Major > Labels: Umbrella, pull-request-available > Fix For: 1.9.0 > > > This is an umbrella issue for enabling fine grained resource management in > Flink. > Fine grained resource management is a big topic that requires long term > efforts. There are many issues to be addressed and designing decisions to be > made, some of which may not be resolved in short time. Here we propose our > design and implementation plan for the upcoming release 1.9, as well as our > thoughts and ideas on the long term road map on this topic. > A practical short term target is to enable fine grained resource management > for batch sql jobs only in the upcoming Flink 1.9. This is necessary for > batch operators added from blink to achieve good performance. > Please find detailed design and implementation plan in attached docs. Any > comment and feedback are welcomed and appreciated. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] xintongsong opened a new pull request #8846: [FLINK-12761][runtime] Dynamically allocate TaskExecutor's managed memory to slots.
xintongsong opened a new pull request #8846: [FLINK-12761][runtime] Dynamically allocate TaskExecutor's managed memory to slots. URL: https://github.com/apache/flink/pull/8846 ## What is the purpose of the change This pull request is base on #8740. It dynamically allocates the TaskExecutor's managed memory to the slots on it. ## Brief change log - Introduce allocationResourceProfile in task slots denoting dynamically allocated resources. - Introduce bookkeeping of TaskExecutor's available resources in SlotManager. - Dynamically allocate TaskExecutor's managed memory to slots in SlotManager. ## Verifying this change This change added tests and can be verified as follows: - Added SlotManagerTest#testDynamicAssigningManagedMemory that validates that SlotManager properly bookkeeps TaskExecutors' available managed memory and allocate slots accordingly. - Added TaskExecutorTest#testAllocationResourceProfile that validates that TaskExecutor reports and offers slots with correct allocation resource profile that the slots are requested with. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zjuwangg commented on a change in pull request #8809: [FLIN-12663]Implement HiveTableSource to read Hive tables
zjuwangg commented on a change in pull request #8809: [FLIN-12663]Implement HiveTableSource to read Hive tables URL: https://github.com/apache/flink/pull/8809#discussion_r296540440 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java ## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.hive; + +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory; +import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper; +import org.apache.flink.table.catalog.hive.client.HiveShimLoader; +import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator; +import org.apache.flink.table.sources.InputFormatTableSource; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.types.Row; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.mapred.JobConf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Date; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * HiveTableSource used in tableApi/Sql environment to read data from hive table. + */ +public class HiveTableSource extends InputFormatTableSource { + + private static Logger logger = LoggerFactory.getLogger(HiveTableSource.class); + + private final TableSchema tableSchema; + private final JobConf jobConf; + private final String dbName; + private final String tableName; + private final Boolean isPartitionTable; + private final String[] partitionColNames; Review comment: Why use List is better than String[]? Is there a benefit by doing so? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zjuwangg commented on a change in pull request #8809: [FLIN-12663]Implement HiveTableSource to read Hive tables
zjuwangg commented on a change in pull request #8809: [FLIN-12663]Implement HiveTableSource to read Hive tables URL: https://github.com/apache/flink/pull/8809#discussion_r296540470 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java ## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.hive; + +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory; +import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper; +import org.apache.flink.table.catalog.hive.client.HiveShimLoader; +import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator; +import org.apache.flink.table.sources.InputFormatTableSource; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.types.Row; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.mapred.JobConf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Date; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * HiveTableSource used in tableApi/Sql environment to read data from hive table. + */ +public class HiveTableSource extends InputFormatTableSource { + + private static Logger logger = LoggerFactory.getLogger(HiveTableSource.class); + + private final TableSchema tableSchema; + private final JobConf jobConf; + private final String dbName; + private final String tableName; + private final Boolean isPartitionTable; + private final String[] partitionColNames; + private List allPartitions; + private String hiveVersion; + + public HiveTableSource(TableSchema tableSchema, + JobConf jobConf, + String dbName, + String tableName, + String[] partitionColNames) { Review comment: as above This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12766) Dynamically allocate TaskExecutor’s managed memory to slots.
[ https://issues.apache.org/jira/browse/FLINK-12766?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xintong Song updated FLINK-12766: - Description: This step is a temporal workaround for release 1.9 to meet the basic usability requirements of batch functions from Blink. > Dynamically allocate TaskExecutor’s managed memory to slots. > > > Key: FLINK-12766 > URL: https://issues.apache.org/jira/browse/FLINK-12766 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Affects Versions: 1.8.0, 1.9.0 >Reporter: Xintong Song >Assignee: Xintong Song >Priority: Major > Fix For: 1.9.0 > > > This step is a temporal workaround for release 1.9 to meet the basic > usability requirements of batch functions from Blink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12766) Dynamically allocate TaskExecutor’s managed memory to slots.
[ https://issues.apache.org/jira/browse/FLINK-12766?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xintong Song updated FLINK-12766: - Summary: Dynamically allocate TaskExecutor’s managed memory to slots. (was: Dynamically allocate TaskExecutor’s managed memory to slots. This step is a temporal workaround for release 1.9 to meet the basic usability requirements of batch functions from Blink.) > Dynamically allocate TaskExecutor’s managed memory to slots. > > > Key: FLINK-12766 > URL: https://issues.apache.org/jira/browse/FLINK-12766 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Affects Versions: 1.8.0, 1.9.0 >Reporter: Xintong Song >Assignee: Xintong Song >Priority: Major > Fix For: 1.9.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] sunhaibotb edited a comment on issue #8731: [FLINK-11878][runtime] Implement the runtime handling of BoundedOneInput and BoundedMultiInput
sunhaibotb edited a comment on issue #8731: [FLINK-11878][runtime] Implement the runtime handling of BoundedOneInput and BoundedMultiInput URL: https://github.com/apache/flink/pull/8731#issuecomment-504836392 The travis is green. @pnowojski The later pushes were to re-trigger Travis because Travis was not successful for external reasons, and those pushes only rebased on the master and did not make any code changes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunhaibotb commented on issue #8731: [FLINK-11878][runtime] Implement the runtime handling of BoundedOneInput and BoundedMultiInput
sunhaibotb commented on issue #8731: [FLINK-11878][runtime] Implement the runtime handling of BoundedOneInput and BoundedMultiInput URL: https://github.com/apache/flink/pull/8731#issuecomment-504836392 The travis has been green. The later pushes were to re-trigger Travis because Travis was not successful for external reasons, and those pushes only rebased on the master and did not make any code changes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-12909) add try catch when find a unique file name for the spilling channel
[ https://issues.apache.org/jira/browse/FLINK-12909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16870760#comment-16870760 ] xymaqingxiang commented on FLINK-12909: --- [~rmetzger] A little modification has been made to the description. Please check it. Thank you. > add try catch when find a unique file name for the spilling channel > --- > > Key: FLINK-12909 > URL: https://issues.apache.org/jira/browse/FLINK-12909 > Project: Flink > Issue Type: Bug >Reporter: xymaqingxiang >Priority: Major > > h2. What is the purpose of the change > Catch exceptions thrown due to disk loss, try to find a unique file name for > the spilling channel again. > Modify the createSpillingChannel() method of the > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer > class to solve this problem. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] Myasuka commented on issue #8845: [FLINK-12353] Collect all japicmp output under tools directory
Myasuka commented on issue #8845: [FLINK-12353] Collect all japicmp output under tools directory URL: https://github.com/apache/flink/pull/8845#issuecomment-504834705 @flinkbot attention @zentol This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12909) add try catch when find a unique file name for the spilling channel
[ https://issues.apache.org/jira/browse/FLINK-12909?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xymaqingxiang updated FLINK-12909: -- Description: h2. What is the purpose of the change Catch exceptions thrown due to disk loss, try to find a unique file name for the spilling channel again. Modify the createSpillingChannel() method of the org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer class to solve this problem. was: h2. What is the purpose of the change Catch exceptions thrown due to disk loss, try to find a unique file name for the spilling channel again. > add try catch when find a unique file name for the spilling channel > --- > > Key: FLINK-12909 > URL: https://issues.apache.org/jira/browse/FLINK-12909 > Project: Flink > Issue Type: Bug >Reporter: xymaqingxiang >Priority: Major > > h2. What is the purpose of the change > Catch exceptions thrown due to disk loss, try to find a unique file name for > the spilling channel again. > Modify the createSpillingChannel() method of the > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer > class to solve this problem. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] flinkbot commented on issue #8845: [FLINK-12353] Collect all japicmp output under tools directory
flinkbot commented on issue #8845: [FLINK-12353] Collect all japicmp output under tools directory URL: https://github.com/apache/flink/pull/8845#issuecomment-504834602 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong closed pull request #7803: [FLINK-11606][chinese-translation,Documentation] Translate the "Distr…
wuchong closed pull request #7803: [FLINK-11606][chinese-translation,Documentation] Translate the "Distr… URL: https://github.com/apache/flink/pull/7803 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] asfgit closed pull request #8750: [FLINK-11606] Translate the "Distributed Runtime Environment" page in to Chinese
asfgit closed pull request #8750: [FLINK-11606] Translate the "Distributed Runtime Environment" page in to Chinese URL: https://github.com/apache/flink/pull/8750 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-11606) Translate the "Distributed Runtime Environment" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-11606?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu closed FLINK-11606. --- Resolution: Fixed Fix Version/s: 1.9.0 Fixed in 1.9.0: 058fabedf355f532b9ceb76e9ea7345e0677d71f > Translate the "Distributed Runtime Environment" page into Chinese > - > > Key: FLINK-11606 > URL: https://issues.apache.org/jira/browse/FLINK-11606 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: Jark Wu >Assignee: Kevin Liu >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 20m > Remaining Estimate: 0h > > The page url is > https://ci.apache.org/projects/flink/flink-docs-master/concepts/runtime.html > The markdown file is located in flink/docs/concepts/runtime.zh.md > The markdown file will be created once FLINK-11529 is merged. > You can reference the translation from : > https://github.com/flink-china/1.6.0/blob/master/concepts/runtime.md -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] Myasuka opened a new pull request #8845: [FLINK-12353] Collect all japicmp output under tools
Myasuka opened a new pull request #8845: [FLINK-12353] Collect all japicmp output under tools URL: https://github.com/apache/flink/pull/8845 ## What is the purpose of the change Collect all the output of `japicmp` under `tools/japicmp-collect` directory instead of running separate shell script `collect_japicmp_reports.sh` previously. ## Brief change log - Change the output directory of `japicmp` plugin to the same `flink-root-dir/tools/japicmp-collect` directory. - Remove `collect_japicmp_reports.sh` script. ## Verifying this change This change is a trivial rework without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? **not applicable** This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-6500) RowSerialization problem
[ https://issues.apache.org/jira/browse/FLINK-6500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16870757#comment-16870757 ] liuzhaokun commented on FLINK-6500: --- [~fhueske] I have the same problem,is there any solution with the new way you said above? > RowSerialization problem > > > Key: FLINK-6500 > URL: https://issues.apache.org/jira/browse/FLINK-6500 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Reporter: radu >Priority: Major > Labels: error > > Relying on implicit types in scala can lead to errors when the arity of the > input is different than the output (for Row) > {code:title=Bar.java|borderStyle=solid} > /** test row stream registered table **/ > @Test > def testRowRegisterWithNames(): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env) > StreamITCase.clear > val sqlQuery = "SELECT a,c FROM MyTableRow WHERE c < 3" > val data = List( > Row.of("Hello", "Worlds", Int.box(1)), > Row.of("Hello", "Hiden", Int.box(5)), > Row.of("Hello again", "Worlds", Int.box(2))) > > val types = Array[TypeInformation[_]]( > BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.INT_TYPE_INFO).toArray > val names = Array("a","b","c").toArray > implicit val tpe: TypeInformation[Row] = new RowTypeInfo(types, names) // > tpe is automatically > > val ds = env.fromCollection(data) > > val t = ds.toTable(tEnv).as('a, 'b, 'c) > tEnv.registerTable("MyTableRow", t) > val result = tEnv.sql(sqlQuery).toDataStream[Row] > result.addSink(new StreamITCase.StringSink) > env.execute() > val expected = List("Hello,1","Hello again,2") > assertEquals(expected.sorted, StreamITCase.testResults.sorted) > } > {code} > will throw a runtime exception: > testRowRegisterWithNames(org.apache.flink.table.api.scala.stream.sql.SqlITCase) > Time elapsed: 0.619 sec <<< ERROR! > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:933) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.RuntimeException: Row arity of from does not match > serializers. > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:82) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:43) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) > at >
[jira] [Updated] (FLINK-12353) Collect all modules of japicmp output under the same directory
[ https://issues.apache.org/jira/browse/FLINK-12353?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12353: --- Labels: pull-request-available (was: ) > Collect all modules of japicmp output under the same directory > -- > > Key: FLINK-12353 > URL: https://issues.apache.org/jira/browse/FLINK-12353 > Project: Flink > Issue Type: Bug > Components: Build System >Reporter: Yun Tang >Assignee: Yun Tang >Priority: Minor > Labels: pull-request-available > > Currently, there are eight modules using japicmp plugin. However, only four > of them would collect japicmp reports in {{collect_japicmp_reports.sh}}. I > have to modify the shell script to collect all reports and therefore I plan > to contribute this change. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12909) add try catch when find a unique file name for the spilling channel
[ https://issues.apache.org/jira/browse/FLINK-12909?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xymaqingxiang updated FLINK-12909: -- Description: h2. What is the purpose of the change Catch exceptions thrown due to disk loss, try to find a unique file name for the spilling channel again. was:add try catch when find a unique file name for the spilling channel > add try catch when find a unique file name for the spilling channel > --- > > Key: FLINK-12909 > URL: https://issues.apache.org/jira/browse/FLINK-12909 > Project: Flink > Issue Type: Bug >Reporter: xymaqingxiang >Priority: Major > > h2. What is the purpose of the change > Catch exceptions thrown due to disk loss, try to find a unique file name for > the spilling channel again. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12953) View logs from Job view in Web Dashboard
[ https://issues.apache.org/jira/browse/FLINK-12953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16870753#comment-16870753 ] vinoyang commented on FLINK-12953: -- Hi [~chadrik] IMO, currently, users log information should be viewed from Task Manager, not Job Manager. We did log separation by Job from Task Manager. > View logs from Job view in Web Dashboard > > > Key: FLINK-12953 > URL: https://issues.apache.org/jira/browse/FLINK-12953 > Project: Flink > Issue Type: Improvement > Components: Runtime / Web Frontend >Reporter: Chad Dombrova >Priority: Major > > As a (beam) developer I want to be able to print/log information from my > custom transforms, and then monitor that output within the job view of the > Web Dashboard, so that I don't have to go hunting through the combined log in > the Job Manager view. The Job Manager log has way too much in it, spanning > all jobs, including output logged by both flink and user code. > A good example of how this UX should work can be found in Google Dataflow: > - click on a job, and see the logged output for that job > - click on a transform, and see the logged output for just that transform > thanks! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] WeiZhong94 commented on issue #8840: [FLINK-12931][python] Fix lint-python.sh cannot find flake8
WeiZhong94 commented on issue #8840: [FLINK-12931][python] Fix lint-python.sh cannot find flake8 URL: https://github.com/apache/flink/pull/8840#issuecomment-504825020 LGTM, +1 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8844: [FLINK-12951][table-planner] Add logic to bridge DDL to table source(…
flinkbot commented on issue #8844: [FLINK-12951][table-planner] Add logic to bridge DDL to table source(… URL: https://github.com/apache/flink/pull/8844#issuecomment-504813395 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] danny0405 opened a new pull request #8844: [FLINK-12951][table-planner] Add logic to bridge DDL to table source(…
danny0405 opened a new pull request #8844: [FLINK-12951][table-planner] Add logic to bridge DDL to table source(… URL: https://github.com/apache/flink/pull/8844 ## What is the purpose of the change This pull request add logic to convert a sql DDL to table source/sink. ## Brief change log - Add `TableEnvironment#sql()` and `TableEnvironment#registerTable()` - Add transform logic for all kinds of sql node in SqlExecutableStatements to table Operations - Add new table Operation `CreateOperation` for all the DDLs - Add computed columns/PKs/UKs support for `TableSchema` ## Verifying this change This change is covered by test cases `CatalogTableITCase` and `SqlExecutableStatementsTest`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes, the TableSchema supports PK/UK/Computed Column - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? not documented This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12951) Add logic to bridge DDL to table source(sink)
[ https://issues.apache.org/jira/browse/FLINK-12951?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12951: --- Labels: pull-request-available (was: ) > Add logic to bridge DDL to table source(sink) > - > > Key: FLINK-12951 > URL: https://issues.apache.org/jira/browse/FLINK-12951 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Affects Versions: 1.9.0 >Reporter: Danny Chan >Assignee: Danny Chan >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] klion26 commented on issue #8750: [FLINK-11606] Translate the "Distributed Runtime Environment" page in to Chinese
klion26 commented on issue #8750: [FLINK-11606] Translate the "Distributed Runtime Environment" page in to Chinese URL: https://github.com/apache/flink/pull/8750#issuecomment-504811817 LGTM +1 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] WeiZhong94 commented on issue #8817: [FLINK-12920][python] Drop support of register_table_sink with parameters field_names and field_types
WeiZhong94 commented on issue #8817: [FLINK-12920][python] Drop support of register_table_sink with parameters field_names and field_types URL: https://github.com/apache/flink/pull/8817#issuecomment-504810419 LGTM, +1 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12953) View logs from Job view in Web Dashboard
[ https://issues.apache.org/jira/browse/FLINK-12953?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chad Dombrova updated FLINK-12953: -- Description: As a (beam) developer I want to be able to print/log information from my custom transforms, and then monitor that output within the job view of the Web Dashboard, so that I don't have to go hunting through the combined log in the Job Manager view. The Job Manager log has way too much in it, spanning all jobs, including output logged by both flink and user code. A good example of how this UX should work can be found in Google Dataflow: - click on a job, and see the logged output for that job - click on a transform, and see the logged output for just that transform thanks! was: As a developer I want to be able to log information in my code, and then monitor that output within the job view of the Web Dashboard, so that I don't have to go hunting through the combined log in the Job Manager view. The Job Manager log has way too much data in it, from across all jobs, mixing both output logged by flink and user code. A good example of how this should work can be found in Google Dataflow: - click on a job, and see the logged output for that job - click on a transform, and see the logged output for just that transform thanks! > View logs from Job view in Web Dashboard > > > Key: FLINK-12953 > URL: https://issues.apache.org/jira/browse/FLINK-12953 > Project: Flink > Issue Type: Improvement > Components: Runtime / Web Frontend >Reporter: Chad Dombrova >Priority: Major > > As a (beam) developer I want to be able to print/log information from my > custom transforms, and then monitor that output within the job view of the > Web Dashboard, so that I don't have to go hunting through the combined log in > the Job Manager view. The Job Manager log has way too much in it, spanning > all jobs, including output logged by both flink and user code. > A good example of how this UX should work can be found in Google Dataflow: > - click on a job, and see the logged output for that job > - click on a transform, and see the logged output for just that transform > thanks! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] danny0405 commented on issue #8830: [FLINK-12933][sql client] support 'use catalog' and 'use database' in SQL CLI
danny0405 commented on issue #8830: [FLINK-12933][sql client] support 'use catalog' and 'use database' in SQL CLI URL: https://github.com/apache/flink/pull/8830#issuecomment-504803156 @KurtYoung @bowenli86 Since we have a new module named `flink-sql-parser` aimed to handle all the sql parse work including DDL/DML/DQL, which means the parser module is the only entrance for any sql parse work in the future, the `Use catalog` and `Use database` are also expected to put there. IMHO, sql client could have its own commands but should exclude the DDL/DML/DQL, which is the duty of sql parser. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-12953) View logs from Job view in Web Dashboard
Chad Dombrova created FLINK-12953: - Summary: View logs from Job view in Web Dashboard Key: FLINK-12953 URL: https://issues.apache.org/jira/browse/FLINK-12953 Project: Flink Issue Type: Improvement Components: Runtime / Web Frontend Reporter: Chad Dombrova As a developer I want to be able to log information in my code, and then monitor that output within the job view of the Web Dashboard, so that I don't have to go hunting through the combined log in the Job Manager view. The Job Manager log has way too much data in it, from across all jobs, mixing both output logged by flink and user code. A good example of how this should work can be found in Google Dataflow: - click on a job, and see the logged output for that job - click on a transform, and see the logged output for just that transform thanks! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12353) Collect all modules of japicmp output under the same directory
[ https://issues.apache.org/jira/browse/FLINK-12353?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang updated FLINK-12353: - Summary: Collect all modules of japicmp output under the same directory (was: Add missing module to collect_japicmp_reports.sh) > Collect all modules of japicmp output under the same directory > -- > > Key: FLINK-12353 > URL: https://issues.apache.org/jira/browse/FLINK-12353 > Project: Flink > Issue Type: Bug > Components: Build System >Reporter: Yun Tang >Assignee: Yun Tang >Priority: Minor > > Currently, there are eight modules using japicmp plugin. However, only four > of them would collect japicmp reports in {{collect_japicmp_reports.sh}}. I > have to modify the shell script to collect all reports and therefore I plan > to contribute this change. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] flinkbot commented on issue #8843: [FLINK-12945][docs] Translate RabbitMQ Connector page into Chinese
flinkbot commented on issue #8843: [FLINK-12945][docs] Translate RabbitMQ Connector page into Chinese URL: https://github.com/apache/flink/pull/8843#issuecomment-504748730 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12945) Translate "RabbitMQ Connector" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-12945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12945: --- Labels: pull-request-available (was: ) > Translate "RabbitMQ Connector" page into Chinese > > > Key: FLINK-12945 > URL: https://issues.apache.org/jira/browse/FLINK-12945 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: Jark Wu >Assignee: aloyszhang >Priority: Minor > Labels: pull-request-available > > Translate the internal page > "https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/rabbitmq.html; > into Chinese. > The doc located in "flink/docs/dev/connectors/rabbitmq.zh.md" -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] flinkbot commented on issue #8842: [FLINK-12947][docs] Translate Twitter Connector page into Chinese
flinkbot commented on issue #8842: [FLINK-12947][docs] Translate Twitter Connector page into Chinese URL: https://github.com/apache/flink/pull/8842#issuecomment-504748596 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] aloyszhang opened a new pull request #8843: [FLINK-12945][docs] Translate RabbitMQ Connector page into Chinese
aloyszhang opened a new pull request #8843: [FLINK-12945][docs] Translate RabbitMQ Connector page into Chinese URL: https://github.com/apache/flink/pull/8843 ## What is the purpose of the change Translate RabbitMQ Connector page into Chinese ## Brief change log Translate RabbitMQ Connector page into Chinese ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: ( no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no ) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12947) Translate "Twitter Connector" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-12947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12947: --- Labels: pull-request-available (was: ) > Translate "Twitter Connector" page into Chinese > --- > > Key: FLINK-12947 > URL: https://issues.apache.org/jira/browse/FLINK-12947 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: Jark Wu >Assignee: aloyszhang >Priority: Minor > Labels: pull-request-available > > Translate the internal page > "https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/twitter.html; > into Chinese. > The doc located in "flink/docs/dev/connectors/twitter.zh.md" -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] aloyszhang opened a new pull request #8842: [FLINK-12947][docs] Translate Twitter Connector page into Chinese
aloyszhang opened a new pull request #8842: [FLINK-12947][docs] Translate Twitter Connector page into Chinese URL: https://github.com/apache/flink/pull/8842 ## What is the purpose of the change Translate Twitter Connector page into Chinese ## Brief change log Translate Twitter Connector page into Chinese ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: ( no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no ) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] asfgit closed pull request #8792: [FLINK-12743][table-runtime-blink] Introduce unbounded streaming anti/semi join operator
asfgit closed pull request #8792: [FLINK-12743][table-runtime-blink] Introduce unbounded streaming anti/semi join operator URL: https://github.com/apache/flink/pull/8792 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-12743) Introduce unbounded streaming anti/semi join operator
[ https://issues.apache.org/jira/browse/FLINK-12743?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu closed FLINK-12743. --- Resolution: Fixed Fix Version/s: 1.9.0 Fixed in 1.9.0: 16818dad7e04fac8f862771a23a702f72e605e72 > Introduce unbounded streaming anti/semi join operator > - > > Key: FLINK-12743 > URL: https://issues.apache.org/jira/browse/FLINK-12743 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Runtime >Reporter: Jark Wu >Assignee: Jark Wu >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > This operator is responsible for unbounded streaming semi/anti join, and will > be optimized in following cases: > 1. If the join keys (with equality condition) are also primary key, we will > have a more efficient state layout > 2. If the inputs have primary keys, but join keys are not primary key, we can > also come up with an efficient state layout > 3. Inputs don't have primary keys, this will go to default implementation -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-12941) Translate "Amazon AWS Kinesis Streams Connector" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-12941?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jasper Yue reassigned FLINK-12941: -- Assignee: Jasper Yue > Translate "Amazon AWS Kinesis Streams Connector" page into Chinese > -- > > Key: FLINK-12941 > URL: https://issues.apache.org/jira/browse/FLINK-12941 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: Jark Wu >Assignee: Jasper Yue >Priority: Minor > > Translate the internal page > "https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/kinesis.html; > into Chinese. > > The doc located in "flink/docs/dev/connectors/kinesis.zh.md" -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-12945) Translate "RabbitMQ Connector" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-12945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16870533#comment-16870533 ] aloyszhang edited comment on FLINK-12945 at 6/23/19 11:51 AM: -- Hi jasper, Thanks for watching this issue. Just like the "Translate Twitter connector" issue here [https://issues.apache.org/jira/projects/FLINK/issues/FLINK-12947?filter=allissues], I have been working on this and almost done. was (Author: aloyszhang): Hi jasper, Thanks for watching this issue. IJust like the "Translate Twitter connector" issue here [https://issues.apache.org/jira/projects/FLINK/issues/FLINK-12947?filter=allissues], I have been working on this and almost done. > Translate "RabbitMQ Connector" page into Chinese > > > Key: FLINK-12945 > URL: https://issues.apache.org/jira/browse/FLINK-12945 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: Jark Wu >Assignee: aloyszhang >Priority: Minor > > Translate the internal page > "https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/rabbitmq.html; > into Chinese. > The doc located in "flink/docs/dev/connectors/rabbitmq.zh.md" -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12945) Translate "RabbitMQ Connector" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-12945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16870533#comment-16870533 ] aloyszhang commented on FLINK-12945: Hi jasper, Thanks for watching this issue. IJust like the "Translate Twitter connector" issue here [https://issues.apache.org/jira/projects/FLINK/issues/FLINK-12947?filter=allissues], I have been working on this and almost done. > Translate "RabbitMQ Connector" page into Chinese > > > Key: FLINK-12945 > URL: https://issues.apache.org/jira/browse/FLINK-12945 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: Jark Wu >Assignee: aloyszhang >Priority: Minor > > Translate the internal page > "https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/rabbitmq.html; > into Chinese. > The doc located in "flink/docs/dev/connectors/rabbitmq.zh.md" -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] flinkbot commented on issue #8841: [FLINK-12765][coordinator] Bookkeeping of available resources of allocated slots in SlotPool
flinkbot commented on issue #8841: [FLINK-12765][coordinator] Bookkeeping of available resources of allocated slots in SlotPool URL: https://github.com/apache/flink/pull/8841#issuecomment-504744204 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12765) Bookkeeping of available resources of allocated slots in SlotPool.
[ https://issues.apache.org/jira/browse/FLINK-12765?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12765: --- Labels: pull-request-available (was: ) > Bookkeeping of available resources of allocated slots in SlotPool. > -- > > Key: FLINK-12765 > URL: https://issues.apache.org/jira/browse/FLINK-12765 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Affects Versions: 1.8.0, 1.9.0 >Reporter: Xintong Song >Assignee: Yun Gao >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > > In this version, a task will always requests slot with its own resource need. > If the resource need is less than the default slot resource, it will always > be allocated to a default sized slot. > > The extra resources in the slot leaves chances for other tasks within the > same slot sharing group to fit in. To take these chance, SlotPool will > maintain available resources of each allocated slot. Available resource of an > allocated slot should always be the total resource of the slot minus > resources of tasks already assigned onto the slot. In this way, the SlotPool > would be able to determine whether another task can fit into the slot. If a > task cannot fit into the slot, for slot sharing group the SlotPool should > request another slot from the ResourceManager, and for colocation group it > should fail the job. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] gaoyunhaii opened a new pull request #8841: [FLINK-12765][coordinator] Bookkeeping of available resources of allocated slots in SlotPool
gaoyunhaii opened a new pull request #8841: [FLINK-12765][coordinator] Bookkeeping of available resources of allocated slots in SlotPool URL: https://github.com/apache/flink/pull/8841 ## What is the purpose of the change This PR is to introduce the bookkeeping logic for the shared slots and colocated slots. It is a part of introducing the fine-grained resource management for Flink. Based on the current design, a task will always request the resource according to its own resource need, and the returned slot may be larger than requested resource. Therefore, it leaves chance for slot sharing and colocation group. For slot sharing, if the resource is enough for all the slot requests, they will be fulfilled directly, otherwise the over-allocated requests will retry and apply for the new slot. Besides, when checking the resolved allocated slots, the remaining resource is used for matching instead of the total resource. For co-location group, if the resource of the allocated slot is not enough for all the co-located tasks, the allocation will fail with no retry. To be more concrete, if the requests have already exceeded the allocated resource when the slot is offered by RM, all the requests will fail directly without retry. On the other hand, if the requests have not exceeded the allocated resource when the slot is offered by RM, they will be marked as successful. However, if the following co-located requests find that there are not enough resource left, these new requests will fail without retry. Since all the co-located tasks belong to the same region, all the co-located tasks will fail eventually. This implementation avoids postponing the requests till all requests of the co-located group are seen, therefore it will not introduce drawbacks for requests without the resource requirements. ## Brief change log 1. Introduce the statistics of the resource requested in the hierarchical structure of MultiTaskSlot/SingleTaskSlot to bookkeeping the already requested resources. 2. Modify the interface of `SlotSelectionStrategy` to also pass the remaining resource of the underlying slot. The implementation of the strategies should use the remaining resource instead of the total resource. 3. Add the resource checking logic when the underlying slot is resolved. The over-allocated requests will be marked fail. The failure is able to retry iff some requests are fulfilled by the underlying slot. 4. Add the retry logic for over-allocated requests in `SchedulerImpl` if the exception is marked as able to retry. 5. For the co-located requests, add the checking of whether the remaining resource is possible to fulfill the requests if the underlying slot is already resolved. ## Verifying this change This change added tests and can be verified as follows: - Added tests that validate the calculated of requested resource for the hierarchical structure of MultiTaskSlot/SingleTaskSlot. - Added tests that validate the routine to fail the over-allocated requests when the underlying slot is resolved. - Added tests that validate the retry logic after failing due to over-allocation. - Added tests that validate the failure of the co-located requests if the slot is not enough for all the co-located tasks. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **yes** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? [Doc](https://docs.google.com/document/d/1UR3vYsLOPXMGVyXHXYg3b5yNZcvSTvot4wela8knVAY/edit?usp=sharing) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-12947) Translate "Twitter Connector" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-12947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16870529#comment-16870529 ] aloyszhang commented on FLINK-12947: Hi jasper, Thanks for watching this issue. I have been working on this and already done this. I will open a PR tonight or tomorrow. > Translate "Twitter Connector" page into Chinese > --- > > Key: FLINK-12947 > URL: https://issues.apache.org/jira/browse/FLINK-12947 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: Jark Wu >Assignee: aloyszhang >Priority: Minor > > Translate the internal page > "https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/twitter.html; > into Chinese. > The doc located in "flink/docs/dev/connectors/twitter.zh.md" -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] knaufk commented on issue #8607: [FLINK-12652] [documentation] add first version of a glossary
knaufk commented on issue #8607: [FLINK-12652] [documentation] add first version of a glossary URL: https://github.com/apache/flink/pull/8607#issuecomment-504732580 @NicoK @fhueske Can we merge this? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-12947) Translate "Twitter Connector" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-12947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16870497#comment-16870497 ] Jasper Yue commented on FLINK-12947: Hi [~aloyszhang], do you mind assign this issue to me? > Translate "Twitter Connector" page into Chinese > --- > > Key: FLINK-12947 > URL: https://issues.apache.org/jira/browse/FLINK-12947 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: Jark Wu >Assignee: aloyszhang >Priority: Minor > > Translate the internal page > "https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/twitter.html; > into Chinese. > The doc located in "flink/docs/dev/connectors/twitter.zh.md" -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12945) Translate "RabbitMQ Connector" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-12945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16870494#comment-16870494 ] Jasper Yue commented on FLINK-12945: Hi [~aloyszhang], do you mind assign this issue to me? > Translate "RabbitMQ Connector" page into Chinese > > > Key: FLINK-12945 > URL: https://issues.apache.org/jira/browse/FLINK-12945 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: Jark Wu >Assignee: aloyszhang >Priority: Minor > > Translate the internal page > "https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/rabbitmq.html; > into Chinese. > The doc located in "flink/docs/dev/connectors/rabbitmq.zh.md" -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-11622) Translate the "Command-Line Interface" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-11622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jasper Yue reassigned FLINK-11622: -- Assignee: Nicholas Jiang (was: Jasper Yue) > Translate the "Command-Line Interface" page into Chinese > > > Key: FLINK-11622 > URL: https://issues.apache.org/jira/browse/FLINK-11622 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: Hui Zhao >Assignee: Nicholas Jiang >Priority: Major > > The page url is > https://ci.apache.org/projects/flink/flink-docs-master/ops/cli.html > The markdown file is located in > https://github.com/apache/flink/blob/master/docs/ops/cli.md > The markdown file will be created once FLINK-11529 is merged. > You can reference the translation from : > https://github.com/flink-china/1.6.0/blob/master/ops/cli.md -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11622) Translate the "Command-Line Interface" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-11622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16870488#comment-16870488 ] Jasper Yue commented on FLINK-11622: Hi [~nicholasjiang], I have not started to worked on this issue, if you have already translated this page, please go ahead to commit pull request, I will assign back to you.:D > Translate the "Command-Line Interface" page into Chinese > > > Key: FLINK-11622 > URL: https://issues.apache.org/jira/browse/FLINK-11622 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: Hui Zhao >Assignee: Jasper Yue >Priority: Major > > The page url is > https://ci.apache.org/projects/flink/flink-docs-master/ops/cli.html > The markdown file is located in > https://github.com/apache/flink/blob/master/docs/ops/cli.md > The markdown file will be created once FLINK-11529 is merged. > You can reference the translation from : > https://github.com/flink-china/1.6.0/blob/master/ops/cli.md -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12783) A K8S CRD to manage job lifecycle
[ https://issues.apache.org/jira/browse/FLINK-12783?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16870459#comment-16870459 ] Shahar Frank commented on FLINK-12783: -- Thanks [~rmetzger]! I subscribed to the PR you mentioned. Once it's merged I will add and create a PR. Are there guidelines on how you'd like these to look like or any constraints on what I can write there? > A K8S CRD to manage job lifecycle > - > > Key: FLINK-12783 > URL: https://issues.apache.org/jira/browse/FLINK-12783 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Shahar Frank >Priority: Minor > > Hello, > I would like to suggest adding an entry to the following documentation page: > https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/kubernetes.html > I have recently released this open-source K8S CRD that manages the entire job > lifecycle automatically. > It can all be found here: > https://github.com/srfrnk/k8s-flink-operator > This allows users to simply create images with only their Jar and deploy jobs > easily. It supports both streaming and batch jobs (as K8S CronJobs). > It allows mounting volumes, setting env and job props. > I think it may help many people who are using Flink with K8S. > We are using this in production now. -- This message was sent by Atlassian JIRA (v7.6.3#76005)