[jira] [Assigned] (FLINK-4817) Checkpoint Coordinator should be called to restore state with a specific checkpoint ID
[ https://issues.apache.org/jira/browse/FLINK-4817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wei-Che Wei reassigned FLINK-4817: -- Assignee: (was: Wei-Che Wei) > Checkpoint Coordinator should be called to restore state with a specific > checkpoint ID > -- > > Key: FLINK-4817 > URL: https://issues.apache.org/jira/browse/FLINK-4817 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stephan Ewen > > Rather than being called to restore the "latest" checkpoint, the Checkpoint > Coordinator should be called to restore a specific checkpoint. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4816) Executions failed from "DEPLOYING" should retain restored checkpoint information
[ https://issues.apache.org/jira/browse/FLINK-4816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15896781#comment-15896781 ] ramkrishna.s.vasudevan commented on FLINK-4816: --- [~tonywei] I would like to continue with this. Infact was waiting for Stephan's feedback. In a day or two I will submit a PR. Thanks. > Executions failed from "DEPLOYING" should retain restored checkpoint > information > > > Key: FLINK-4816 > URL: https://issues.apache.org/jira/browse/FLINK-4816 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Reporter: Stephan Ewen > > When an execution fails from state {{DEPLOYING}}, it should wrap the failure > to better report the failure cause: > - If no checkpoint was restored, it should wrap the exception in a > {{DeployTaskException}} > - If a checkpoint was restored, it should wrap the exception in a > {{RestoreTaskException}} and record the id of the checkpoint that was > attempted to be restored. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-5957) Remove `getAccumulatorType` method from build-in `AggregateFunction`
[ https://issues.apache.org/jira/browse/FLINK-5957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15896761#comment-15896761 ] sunjincheng edited comment on FLINK-5957 at 3/6/17 5:32 AM: Hi, [~fhueske], I have two ideas here: First of all: In fact I do not like to use generics in build-in Accumulator, such as: {code} Class SumAccumulator [T] extends JTuple2 [T, Boolean] with Accumulator {code} I think we should remove these generic definitions. Secondly: We can define the Accumulator of the build-in aggregateFunction as a POJO type, which is more concise. e.g.: {code} class IntSumAccumulator(var f0: Int, var f1:Boolean) extends Accumulator{ def this(){ this(0, false) } } {code} IMO. If I did the above two points, build-in the aggregateFunction we do not have to provide [[getAccumulatorType]] method, TypeInformation.of () works well. The build-in aggregateFunction is an example of a user-defined aggregateFunction, so the implementation of the build-in aggregateFunction directly affects the user's implementation. So, IMO. I suggest to do above changes. What do you think ? Best, SunJincheng was (Author: sunjincheng121): Hi, [~fhueske], I have two ideas here: First of all: In fact I do not like to use generics in build-in Accumulator, such as: {code} Class SumAccumulator [T] extends JTuple2 [T, Boolean] with Accumulator {code} I think we should remove these generic definitions. Secondly: We can define the Accumulator of the build-in aggregateFunction as a POJO type, which is more concise. IMO. If I did the above two points, build-in the aggregateFunction we do not have to provide [[getAccumulatorType]] method, TypeInformation.of () works well. The build-in aggregateFunction is an example of a user-defined aggregateFunction, so the implementation of the build-in aggregateFunction directly affects the user's implementation. So, IMO. I suggest to do above changes. What do you think ? Best, SunJincheng > Remove `getAccumulatorType` method from build-in `AggregateFunction` > - > > Key: FLINK-5957 > URL: https://issues.apache.org/jira/browse/FLINK-5957 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > Build-in aggregateFunction need not implement the `getAccumulatorType` > method. > We can get TypeInformation by `TypeInformation.of() ` or > `TypeInformation.of(new TypeHint[AGG.type](){})`. > What do you think? [~fhueske] [~shaoxuan] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5957) Remove `getAccumulatorType` method from build-in `AggregateFunction`
[ https://issues.apache.org/jira/browse/FLINK-5957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15896761#comment-15896761 ] sunjincheng commented on FLINK-5957: Hi, [~fhueske], I have two ideas here: First of all: In fact I do not like to use generics in build-in Accumulator, such as: {code} Class SumAccumulator [T] extends JTuple2 [T, Boolean] with Accumulator {code} I think we should remove these generic definitions. Secondly: We can define the Accumulator of the build-in aggregateFunction as a POJO type, which is more concise. IMO. If I did the above two points, build-in the aggregateFunction we do not have to provide [[getAccumulatorType]] method, TypeInformation.of () works well. The build-in aggregateFunction is an example of a user-defined aggregateFunction, so the implementation of the build-in aggregateFunction directly affects the user's implementation. So, IMO. I suggest to do above changes. What do you think ? Best, SunJincheng > Remove `getAccumulatorType` method from build-in `AggregateFunction` > - > > Key: FLINK-5957 > URL: https://issues.apache.org/jira/browse/FLINK-5957 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > Build-in aggregateFunction need not implement the `getAccumulatorType` > method. > We can get TypeInformation by `TypeInformation.of() ` or > `TypeInformation.of(new TypeHint[AGG.type](){})`. > What do you think? [~fhueske] [~shaoxuan] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/3406#discussion_r104343965 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala --- @@ -68,6 +62,90 @@ object CommonTestData { ) } + def getMockedFlinkExternalCatalog: ExternalCatalog = { +val csvRecord1 = Seq( + "1#1#Hi", + "2#2#Hello", + "3#2#Hello world" +) +val tempFilePath1 = writeToTempFile(csvRecord1.mkString("$"), "csv-test1", "tmp") +val externalCatalogTable1 = ExternalCatalogTable( + TableIdentifier("db1", "tb1"), + "csv", + DataSchema( +Array( + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.LONG_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO), +Array("a", "b", "c") + ), + properties = Map( +"path" -> tempFilePath1, +"fieldDelim" -> "#", +"rowDelim" -> "$" + ) +) + +val csvRecord2 = Seq( + "1#1#0#Hallo#1", + "2#2#1#Hallo Welt#2", + "2#3#2#Hallo Welt wie#1", + "3#4#3#Hallo Welt wie gehts?#2", + "3#5#4#ABC#2", + "3#6#5#BCD#3", + "4#7#6#CDE#2", + "4#8#7#DEF#1", + "4#9#8#EFG#1", + "4#10#9#FGH#2", + "5#11#10#GHI#1", + "5#12#11#HIJ#3", + "5#13#12#IJK#3", + "5#14#13#JKL#2", + "5#15#14#KLM#2" +) +val tempFilePath2 = writeToTempFile(csvRecord2.mkString("$"), "csv-test2", "tmp") +val externalCatalogTable2 = ExternalCatalogTable( + TableIdentifier("db2", "tb2"), + "csv", + DataSchema( +Array( + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.LONG_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.LONG_TYPE_INFO), +Array("d", "e", "f", "g", "h") + ), + properties = Map( +"path" -> tempFilePath2, +"fieldDelim" -> "#", +"rowDelim" -> "$" + ) +) +val catalog = mock(classOf[ExternalCatalog]) --- End diff -- Good idea --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/3406#discussion_r104343679 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala --- @@ -68,6 +62,90 @@ object CommonTestData { ) } + def getMockedFlinkExternalCatalog: ExternalCatalog = { +val csvRecord1 = Seq( --- End diff -- val csvRecord1 = Seq() is just csv data, maybe it's better to name it as csvDataRecords? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-5957) Remove `getAccumulatorType` method from build-in `AggregateFunction`
[ https://issues.apache.org/jira/browse/FLINK-5957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng updated FLINK-5957: --- Summary: Remove `getAccumulatorType` method from build-in `AggregateFunction` (was: Remove `getAccumulatorType` method from `AggregateFunction`) > Remove `getAccumulatorType` method from build-in `AggregateFunction` > - > > Key: FLINK-5957 > URL: https://issues.apache.org/jira/browse/FLINK-5957 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > Build-in aggregateFunction need not implement the `getAccumulatorType` > method. > We can get TypeInformation by `TypeInformation.of() ` or > `TypeInformation.of(new TypeHint[AGG.type](){})`. > What do you think? [~fhueske] [~shaoxuan] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-2720) Add Storm-CountMetric in flink-stormcompatibility
[ https://issues.apache.org/jira/browse/FLINK-2720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15896722#comment-15896722 ] ASF GitHub Bot commented on FLINK-2720: --- Github user RalphSu commented on the issue: https://github.com/apache/flink/pull/1157 okey, would try file a PR on this soon > Add Storm-CountMetric in flink-stormcompatibility > - > > Key: FLINK-2720 > URL: https://issues.apache.org/jira/browse/FLINK-2720 > Project: Flink > Issue Type: New Feature > Components: Storm Compatibility >Reporter: Huang Wei >Assignee: Matthias J. Sax > Fix For: 1.0.0 > > Original Estimate: 168h > Remaining Estimate: 168h > > Add the CountMetric for the first step of storm metrics: > 1.Do a wrapper FlinkCountMetric for CountMetric > 2.push the RuntimeContext in FlinkTopologyContext to use `addAccumulator` > method for registering the metric. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #1157: [FLINK-2720][storm-compatibility]Add Storm-CountMetric fo...
Github user RalphSu commented on the issue: https://github.com/apache/flink/pull/1157 okey, would try file a PR on this soon --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Comment Edited] (FLINK-4848) keystoreFilePath should be checked against null in SSLUtils#createSSLServerContext
[ https://issues.apache.org/jira/browse/FLINK-4848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15582930#comment-15582930 ] Ted Yu edited comment on FLINK-4848 at 3/6/17 3:57 AM: --- There is similar issue with trustStoreFilePath: {code} trustStoreFile = new FileInputStream(new File(trustStoreFilePath)); {code} was (Author: yuzhih...@gmail.com): There is similar issue with trustStoreFilePath: {code} trustStoreFile = new FileInputStream(new File(trustStoreFilePath)); {code} > keystoreFilePath should be checked against null in > SSLUtils#createSSLServerContext > -- > > Key: FLINK-4848 > URL: https://issues.apache.org/jira/browse/FLINK-4848 > Project: Flink > Issue Type: Bug > Components: Security >Reporter: Ted Yu >Priority: Minor > > {code} > String keystoreFilePath = sslConfig.getString( > ConfigConstants.SECURITY_SSL_KEYSTORE, > null); > ... > try { > keyStoreFile = new FileInputStream(new File(keystoreFilePath)); > {code} > If keystoreFilePath is null, the File ctor would throw NPE. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/3406#discussion_r104340593 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala --- @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog + +import java.util + +import org.apache.calcite.linq4j.tree.Expression +import org.apache.calcite.schema._ +import org.apache.flink.table.api.{DatabaseNotExistException, TableNotExistException} +import org.slf4j.{Logger, LoggerFactory} + +import scala.collection.JavaConverters._ + +/** + * This class is responsible for connect external catalog to calcite catalog. + * In this way, it is possible to look-up and access tables in SQL queries + * without registering tables in advance. + * The databases in the external catalog registers as calcite sub-Schemas of current schema. + * The tables in a given database registers as calcite tables + * of the [[ExternalCatalogDatabaseSchema]]. + * + * @param catalogIdentifier external catalog name + * @param catalog external catalog + */ +class ExternalCatalogSchema( +catalogIdentifier: String, +catalog: ExternalCatalog) extends Schema { + + private val LOG: Logger = LoggerFactory.getLogger(this.getClass) + + /** +* Looks up database by the given sub-schema name in the external catalog, +* returns it Wrapped in a [[ExternalCatalogDatabaseSchema]] with the given database name. +* +* @param name Sub-schema name +* @return Sub-schema with a given name, or null +*/ + override def getSubSchema(name: String): Schema = { +try { + val db = catalog.getDatabase(name) + if (db != null) { +new ExternalCatalogDatabaseSchema(db.dbName, catalog) + } else { +null + } +} catch { + case e: DatabaseNotExistException => +LOG.warn(s"database $name does not exist in externalCatalog $catalogIdentifier") +null +} + } + + /** +* Lists the databases of the external catalog, +* returns the lists as the names of this schema's sub-schemas. +* +* @return names of this schema's child schemas +*/ + override def getSubSchemaNames: util.Set[String] = catalog.listDatabases().toSet.asJava + + override def getTable(name: String): Table = null + + override def isMutable: Boolean = true + + override def getFunctions(name: String): util.Collection[Function] = +util.Collections.emptyList[Function] + + override def getExpression(parentSchema: SchemaPlus, name: String): Expression = +Schemas.subSchemaExpression(parentSchema, name, getClass) + + override def getFunctionNames: util.Set[String] = util.Collections.emptySet[String] + + override def getTableNames: util.Set[String] = util.Collections.emptySet[String] + + override def contentsHaveChangedSince(lastCheck: Long, now: Long): Boolean = false --- End diff -- set contentsHaveChangedSince to false because We want to fetch the latest informations from the underlying ExternalCatalog instead of using calcite caches. Because tables list and databases list of ExternalCatalog may changed anytime. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-5902) Some images can not show in IE
[ https://issues.apache.org/jira/browse/FLINK-5902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shijinkui updated FLINK-5902: - Issue Type: Sub-task (was: Bug) Parent: FLINK-5839 > Some images can not show in IE > -- > > Key: FLINK-5902 > URL: https://issues.apache.org/jira/browse/FLINK-5902 > Project: Flink > Issue Type: Sub-task > Components: Webfrontend > Environment: IE >Reporter: Tao Wang > Attachments: chrome is ok.png, IE 11 with problem.png > > > Some images in the Overview page can not show in IE, as it is good in Chrome. > I'm using IE 11, but think same with IE9&IE10. I'll paste the screenshot > later. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1589#comment-1589 ] ASF GitHub Bot commented on FLINK-4460: --- Github user wenlong88 commented on a diff in the pull request: https://github.com/apache/flink/pull/3438#discussion_r104334699 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java --- @@ -19,30 +19,35 @@ package org.apache.flink.streaming.api.functions; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.streaming.api.TimeDomain; import org.apache.flink.streaming.api.TimerService; import org.apache.flink.util.Collector; /** * A function that processes elements of a stream. * - * The function will be called for every element in the input stream and can produce - * zero or more output. The function can also query the time and set timers. When - * reacting to the firing of set timers the function can emit yet more elements. + * For every element in the input stream {@link #processElement(Object, Context, Collector)} + * is invoked. This can produce zero or more elements as output. Implementations can also + * query the time and set timers through the provided {@link Context}. For firing timers + * {@link #onTimer(long, OnTimerContext, Collector)} will be invoked. This can again produce + * zero or more elements as output and register further timers. * - * The function will be called for every element in the input stream and can produce - * zero or more output elements. Contrary to the - * {@link org.apache.flink.api.common.functions.FlatMapFunction}, this function can also query - * the time (both event and processing) and set timers, through the provided {@link Context}. - * When reacting to the firing of set timers the function can directly emit a result, and/or - * register a timer that will trigger an action in the future. + * NOTE: Access to keyed state and timers (which are also scoped to a key) is only + * available if the {@code ProcessFunction} is applied on a {@code KeyedStream}. + * + * NOTE: A {@code ProcessFunction} is always a + * {@link org.apache.flink.api.common.functions.RichFunction}. Therefore, access to the + * {@link org.apache.flink.api.common.functions.RuntimeContext} as always available and setup and + * teardown methods can be implemented. See + * {@link org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)} + * and {@link org.apache.flink.api.common.functions.RichFunction#close()}. * * @param Type of the input elements. * @param Type of the output elements. */ @PublicEvolving -public interface ProcessFunction extends Function { +public abstract class ProcessFunction extends AbstractRichFunction { --- End diff -- hi, changing form `interface` to `class` is incompatible on the user side. Can't ProcessFunction just extend RichFunction? > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3438: [FLINK-4460] Allow ProcessFunction on non-keyed st...
Github user wenlong88 commented on a diff in the pull request: https://github.com/apache/flink/pull/3438#discussion_r104334699 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java --- @@ -19,30 +19,35 @@ package org.apache.flink.streaming.api.functions; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.streaming.api.TimeDomain; import org.apache.flink.streaming.api.TimerService; import org.apache.flink.util.Collector; /** * A function that processes elements of a stream. * - * The function will be called for every element in the input stream and can produce - * zero or more output. The function can also query the time and set timers. When - * reacting to the firing of set timers the function can emit yet more elements. + * For every element in the input stream {@link #processElement(Object, Context, Collector)} + * is invoked. This can produce zero or more elements as output. Implementations can also + * query the time and set timers through the provided {@link Context}. For firing timers + * {@link #onTimer(long, OnTimerContext, Collector)} will be invoked. This can again produce + * zero or more elements as output and register further timers. * - * The function will be called for every element in the input stream and can produce - * zero or more output elements. Contrary to the - * {@link org.apache.flink.api.common.functions.FlatMapFunction}, this function can also query - * the time (both event and processing) and set timers, through the provided {@link Context}. - * When reacting to the firing of set timers the function can directly emit a result, and/or - * register a timer that will trigger an action in the future. + * NOTE: Access to keyed state and timers (which are also scoped to a key) is only + * available if the {@code ProcessFunction} is applied on a {@code KeyedStream}. + * + * NOTE: A {@code ProcessFunction} is always a + * {@link org.apache.flink.api.common.functions.RichFunction}. Therefore, access to the + * {@link org.apache.flink.api.common.functions.RuntimeContext} as always available and setup and + * teardown methods can be implemented. See + * {@link org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)} + * and {@link org.apache.flink.api.common.functions.RichFunction#close()}. * * @param Type of the input elements. * @param Type of the output elements. */ @PublicEvolving -public interface ProcessFunction extends Function { +public abstract class ProcessFunction extends AbstractRichFunction { --- End diff -- hi, changing form `interface` to `class` is incompatible on the user side. Can't ProcessFunction just extend RichFunction? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/3406#discussion_r104333889 --- Diff: flink-libraries/flink-table/pom.xml --- @@ -92,7 +92,12 @@ under the License. - + --- End diff -- Yes, this problem bothers me, too. the license of reflections jar is WTFPL, does that means we would do anything, how to check whether it is compatible with AL2? I noticed that the reflections jar is already referenced in flink-parent.pom, but only for tests, and would not be included in any flink jars. My original thought is not only use this jar, but also include it (like calcit) in the flink-table-{version}.jar. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4364) Implement TaskManager side of heartbeat from JobManager
[ https://issues.apache.org/jira/browse/FLINK-4364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15896637#comment-15896637 ] ASF GitHub Bot commented on FLINK-4364: --- Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/3151 Hi @tillrohrmann , I have submitted the modifications as you suggest one week ago. Have you received it and any other issues? > Implement TaskManager side of heartbeat from JobManager > --- > > Key: FLINK-4364 > URL: https://issues.apache.org/jira/browse/FLINK-4364 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: zhijiang >Assignee: zhijiang > > The {{JobManager}} initiates heartbeat messages via (JobID, JmLeaderID), and > the {{TaskManager}} will report metrics info for each heartbeat. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-5966) Document Running Flink on Kubernetes
StephenWithPH created FLINK-5966: Summary: Document Running Flink on Kubernetes Key: FLINK-5966 URL: https://issues.apache.org/jira/browse/FLINK-5966 Project: Flink Issue Type: Improvement Components: Documentation Affects Versions: 1.2.0, 1.3.0 Reporter: StephenWithPH Priority: Minor There are several good sources of information regarding running prior versions of Flink in Kubernetes. I was able to follow those and fill in the gaps to get Flink 1.2 up in K8s. I plan to document my steps in detail in order to submit a PR. There are several existing PRs that may improve how Flink runs in containers. (See [FLINK-5635 Improve Docker tooling|https://github.com/apache/flink/pull/3205] and [FLINK-5634 Flink should not always redirect stdout|https://github.com/apache/flink/pull/3204]) Depending on the timing of those PRs, I may tailor my docs towards Flink 1.3 in order to reflect those changes. I'm opening this JIRA issue to begin the process. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3151: [FLINK-4364] [runtime] [FLIP-6] Implement TaskManager sid...
Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/3151 Hi @tillrohrmann , I have submitted the modifications as you suggest one week ago. Have you received it and any other issues? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #1157: [FLINK-2720][storm-compatibility]Add Storm-CountMetric fo...
Github user mjsax commented on the issue: https://github.com/apache/flink/pull/1157 @HuangWHWHW Are you still working on this? @RalphSu Are you interested in talking this over? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2720) Add Storm-CountMetric in flink-stormcompatibility
[ https://issues.apache.org/jira/browse/FLINK-2720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15896579#comment-15896579 ] ASF GitHub Bot commented on FLINK-2720: --- Github user mjsax commented on the issue: https://github.com/apache/flink/pull/1157 @HuangWHWHW Are you still working on this? @RalphSu Are you interested in talking this over? > Add Storm-CountMetric in flink-stormcompatibility > - > > Key: FLINK-2720 > URL: https://issues.apache.org/jira/browse/FLINK-2720 > Project: Flink > Issue Type: New Feature > Components: Storm Compatibility >Reporter: Huang Wei >Assignee: Matthias J. Sax > Fix For: 1.0.0 > > Original Estimate: 168h > Remaining Estimate: 168h > > Add the CountMetric for the first step of storm metrics: > 1.Do a wrapper FlinkCountMetric for CountMetric > 2.push the RuntimeContext in FlinkTopologyContext to use `addAccumulator` > method for registering the metric. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-4534) Lack of synchronization in BucketingSink#restoreState()
[ https://issues.apache.org/jira/browse/FLINK-4534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-4534: -- Description: Iteration over state.bucketStates is protected by synchronization in other methods, except for the following in restoreState(): {code} for (BucketState bucketState : state.bucketStates.values()) { {code} and following in close(): {code} for (Map.Entry> entry : state.bucketStates.entrySet()) { closeCurrentPartFile(entry.getValue()); {code} w.r.t. bucketState.pendingFilesPerCheckpoint , there is similar issue starting line 752: {code} Set pastCheckpointIds = bucketState.pendingFilesPerCheckpoint.keySet(); LOG.debug("Moving pending files to final location on restore."); for (Long pastCheckpointId : pastCheckpointIds) { {code} was: Iteration over state.bucketStates is protected by synchronization in other methods, except for the following in restoreState(): {code} for (BucketState bucketState : state.bucketStates.values()) { {code} and following in close(): {code} for (Map.Entry> entry : state.bucketStates.entrySet()) { closeCurrentPartFile(entry.getValue()); {code} w.r.t. bucketState.pendingFilesPerCheckpoint , there is similar issue starting line 752: {code} Set pastCheckpointIds = bucketState.pendingFilesPerCheckpoint.keySet(); LOG.debug("Moving pending files to final location on restore."); for (Long pastCheckpointId : pastCheckpointIds) { {code} > Lack of synchronization in BucketingSink#restoreState() > --- > > Key: FLINK-4534 > URL: https://issues.apache.org/jira/browse/FLINK-4534 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Reporter: Ted Yu > > Iteration over state.bucketStates is protected by synchronization in other > methods, except for the following in restoreState(): > {code} > for (BucketState bucketState : state.bucketStates.values()) { > {code} > and following in close(): > {code} > for (Map.Entry> entry : > state.bucketStates.entrySet()) { > closeCurrentPartFile(entry.getValue()); > {code} > w.r.t. bucketState.pendingFilesPerCheckpoint , there is similar issue > starting line 752: > {code} > Set pastCheckpointIds = > bucketState.pendingFilesPerCheckpoint.keySet(); > LOG.debug("Moving pending files to final location on restore."); > for (Long pastCheckpointId : pastCheckpointIds) { > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3475: [FLINK-5965] Typo on DropWizard wrappers
Github user zentol commented on the issue: https://github.com/apache/flink/pull/3475 +1 to merge. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5965) Typo on DropWizard wrappers
[ https://issues.apache.org/jira/browse/FLINK-5965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15896491#comment-15896491 ] ASF GitHub Bot commented on FLINK-5965: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/3475 +1 to merge. > Typo on DropWizard wrappers > --- > > Key: FLINK-5965 > URL: https://issues.apache.org/jira/browse/FLINK-5965 > Project: Flink > Issue Type: Bug > Components: Documentation >Reporter: Francisco Sokol >Priority: Trivial > > The metrics page, in the monitoring section, has two small typos in the > example code: > - `DropWizardHistogramWrapper` should be `DropwizardHistogramWrapper` > - `DropWizardMeterWrapper` should be `DropwizardMeterWrapper` -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5909) Interface for GraphAlgorithm results
[ https://issues.apache.org/jira/browse/FLINK-5909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15896265#comment-15896265 ] ASF GitHub Bot commented on FLINK-5909: --- Github user greghogan commented on the issue: https://github.com/apache/flink/pull/3434 Thanks for the review @vasia. The Unary/Binary/TertiaryResult interfaces will allow follow-on algorithms to work with POJOs or non-conforming Tuples (where we don't assume source and target vertices are fields 0 and 1). A common AnalyticResult interface is helpful with drivers that store both directed and undirected results. The improvements to the library methods docs do reflect an expectation of this PR. > Interface for GraphAlgorithm results > > > Key: FLINK-5909 > URL: https://issues.apache.org/jira/browse/FLINK-5909 > Project: Flink > Issue Type: Sub-task > Components: Gelly >Affects Versions: 1.3.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.3.0 > > > Create {{AlgorithmResult}} and {{AnalyticResult}} interfaces for library > algorithms to implement. This flattens algorithm results to a single tuple. > Also create interfaces for {{UnaryResult}}, {{BinaryResult}}, and > {{TertiaryResult}} implementing methods to access the 0th, 1st, and 2nd > vertices. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3434: [FLINK-5909] [gelly] Interface for GraphAlgorithm results
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/3434 Thanks for the review @vasia. The Unary/Binary/TertiaryResult interfaces will allow follow-on algorithms to work with POJOs or non-conforming Tuples (where we don't assume source and target vertices are fields 0 and 1). A common AnalyticResult interface is helpful with drivers that store both directed and undirected results. The improvements to the library methods docs do reflect an expectation of this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3679) Allow Kafka consumer to skip corrupted messages
[ https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15896252#comment-15896252 ] ASF GitHub Bot commented on FLINK-3679: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3314#discussion_r104312079 --- Diff: flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java --- @@ -419,6 +424,164 @@ public void run() { assertFalse("fetcher threads did not properly finish", sourceContext.isStillBlocking()); } + @Test + public void testSkipCorruptedMessage() throws Exception { + + // - some test data - + + final String topic = "test-topic"; + final int partition = 3; + final byte[] payload = new byte[] {1, 2, 3, 4}; + + final List> records = Arrays.asList( + new ConsumerRecord<>(topic, partition, 15, payload, payload), + new ConsumerRecord<>(topic, partition, 16, payload, payload), + new ConsumerRecord<>(topic, partition, 17, payload, "end".getBytes())); + + final Map>> data = new HashMap<>(); + data.put(new TopicPartition(topic, partition), records); + + final ConsumerRecords consumerRecords = new ConsumerRecords<>(data); + + // - the test consumer - + + final KafkaConsumer mockConsumer = mock(KafkaConsumer.class); + when(mockConsumer.poll(anyLong())).thenAnswer(new Answer>() { + @Override + public ConsumerRecords answer(InvocationOnMock invocation) { + return consumerRecords; + } + }); + + whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer); + + // - build a fetcher - + + ArrayList results = new ArrayList<>(); + SourceContext sourceContext = new CollectingSourceContext<>(results, results); + Map partitionsWithInitialOffsets = + Collections.singletonMap(new KafkaTopicPartition(topic, partition), KafkaTopicPartitionStateSentinel.GROUP_OFFSET); + KeyedDeserializationSchema schema = new KeyedDeserializationSchema() { + + @Override + public String deserialize(byte[] messageKey, byte[] message, + String topic, int partition, long offset) throws IOException { + return offset == 15 ? null : new String(message); + } + + @Override + public boolean isEndOfStream(String nextElement) { + return "end".equals(nextElement); + } + + @Override + public TypeInformation getProducedType() { + return BasicTypeInfo.STRING_TYPE_INFO; + } + }; + + final Kafka09Fetcher fetcher = new Kafka09Fetcher<>( + sourceContext, + partitionsWithInitialOffsets, + null, /* periodic watermark extractor */ + null, /* punctuated watermark extractor */ + new TestProcessingTimeService(), + 10, /* watermark interval */ + this.getClass().getClassLoader(), + true, /* checkpointing */ + "task_name", + new UnregisteredMetricsGroup(), + schema, + new Properties(), + 0L, + false); + + + // - run the fetcher - + + fetcher.runFetchLoop(); + assertEquals(1, results.size()); + } + + @Test + public void testNullAsEOF() throws Exception { --- End diff -- I'm not sure if this test is necessary. It's essentially just testing that `isEndOfStream` works when `isEndOfStream` is `true`. Whether or not the condition is `element == null` seems irrelevant to what's been tested. We also already have a `runEndOfStreamTest` in `KafkaConsumerTestBase`. > Allow Kafka consumer to skip corrupted messages > --- > > Key: FLINK-3679 > URL: https://issues.apache.org/jira/browse/FLINK-3679 > Project: Flink > Issue Type: Bug > Components: DataSt
[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3314#discussion_r104311996 --- Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java --- @@ -381,6 +381,10 @@ else if (partitionsRemoved) { partitionsIterator.remove(); continue partitionsLoop; } + + if (value == null) { + continue; + } --- End diff -- Would it make sense to do the `null` checking inside `emitRecord(...)`? Otherwise, we wouldn't be updating the state for skipped records, and therefore not accounting it as "already processed". I don't think it really matters, since we aren't outputting anything anyway, but I see at least one minor advantage that might deserve changing it: If we fail during a series of continuous skipped records, we won't be wasting any overhead re-processing them on restore. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3679) Allow Kafka consumer to skip corrupted messages
[ https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15896253#comment-15896253 ] ASF GitHub Bot commented on FLINK-3679: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3314#discussion_r104311996 --- Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java --- @@ -381,6 +381,10 @@ else if (partitionsRemoved) { partitionsIterator.remove(); continue partitionsLoop; } + + if (value == null) { + continue; + } --- End diff -- Would it make sense to do the `null` checking inside `emitRecord(...)`? Otherwise, we wouldn't be updating the state for skipped records, and therefore not accounting it as "already processed". I don't think it really matters, since we aren't outputting anything anyway, but I see at least one minor advantage that might deserve changing it: If we fail during a series of continuous skipped records, we won't be wasting any overhead re-processing them on restore. > Allow Kafka consumer to skip corrupted messages > --- > > Key: FLINK-3679 > URL: https://issues.apache.org/jira/browse/FLINK-3679 > Project: Flink > Issue Type: Bug > Components: DataStream API, Kafka Connector >Reporter: Jamie Grier >Assignee: Haohui Mai > > There are a couple of issues with the DeserializationSchema API that I think > should be improved. This request has come to me via an existing Flink user. > The main issue is simply that the API assumes that there is a one-to-one > mapping between input and outputs. In reality there are scenarios where one > input message (say from Kafka) might actually map to zero or more logical > elements in the pipeline. > Particularly important here is the case where you receive a message from a > source (such as Kafka) and say the raw bytes don't deserialize properly. > Right now the only recourse is to throw IOException and therefore fail the > job. > This is definitely not good since bad data is a reality and failing the job > is not the right option. If the job fails we'll just end up replaying the > bad data and the whole thing will start again. > Instead in this case it would be best if the user could just return the empty > set. > The other case is where one input message should logically be multiple output > messages. This case is probably less important since there are other ways to > do this but in general it might be good to make the > DeserializationSchema.deserialize() method return a collection rather than a > single element. > Maybe we need to support a DeserializationSchema variant that has semantics > more like that of FlatMap. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3314#discussion_r104312079 --- Diff: flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java --- @@ -419,6 +424,164 @@ public void run() { assertFalse("fetcher threads did not properly finish", sourceContext.isStillBlocking()); } + @Test + public void testSkipCorruptedMessage() throws Exception { + + // - some test data - + + final String topic = "test-topic"; + final int partition = 3; + final byte[] payload = new byte[] {1, 2, 3, 4}; + + final List> records = Arrays.asList( + new ConsumerRecord<>(topic, partition, 15, payload, payload), + new ConsumerRecord<>(topic, partition, 16, payload, payload), + new ConsumerRecord<>(topic, partition, 17, payload, "end".getBytes())); + + final Map>> data = new HashMap<>(); + data.put(new TopicPartition(topic, partition), records); + + final ConsumerRecords consumerRecords = new ConsumerRecords<>(data); + + // - the test consumer - + + final KafkaConsumer mockConsumer = mock(KafkaConsumer.class); + when(mockConsumer.poll(anyLong())).thenAnswer(new Answer>() { + @Override + public ConsumerRecords answer(InvocationOnMock invocation) { + return consumerRecords; + } + }); + + whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer); + + // - build a fetcher - + + ArrayList results = new ArrayList<>(); + SourceContext sourceContext = new CollectingSourceContext<>(results, results); + Map partitionsWithInitialOffsets = + Collections.singletonMap(new KafkaTopicPartition(topic, partition), KafkaTopicPartitionStateSentinel.GROUP_OFFSET); + KeyedDeserializationSchema schema = new KeyedDeserializationSchema() { + + @Override + public String deserialize(byte[] messageKey, byte[] message, + String topic, int partition, long offset) throws IOException { + return offset == 15 ? null : new String(message); + } + + @Override + public boolean isEndOfStream(String nextElement) { + return "end".equals(nextElement); + } + + @Override + public TypeInformation getProducedType() { + return BasicTypeInfo.STRING_TYPE_INFO; + } + }; + + final Kafka09Fetcher fetcher = new Kafka09Fetcher<>( + sourceContext, + partitionsWithInitialOffsets, + null, /* periodic watermark extractor */ + null, /* punctuated watermark extractor */ + new TestProcessingTimeService(), + 10, /* watermark interval */ + this.getClass().getClassLoader(), + true, /* checkpointing */ + "task_name", + new UnregisteredMetricsGroup(), + schema, + new Properties(), + 0L, + false); + + + // - run the fetcher - + + fetcher.runFetchLoop(); + assertEquals(1, results.size()); + } + + @Test + public void testNullAsEOF() throws Exception { --- End diff -- I'm not sure if this test is necessary. It's essentially just testing that `isEndOfStream` works when `isEndOfStream` is `true`. Whether or not the condition is `element == null` seems irrelevant to what's been tested. We also already have a `runEndOfStreamTest` in `KafkaConsumerTestBase`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #1157: [FLINK-2720][storm-compatibility]Add Storm-CountMetric fo...
Github user RalphSu commented on the issue: https://github.com/apache/flink/pull/1157 ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2720) Add Storm-CountMetric in flink-stormcompatibility
[ https://issues.apache.org/jira/browse/FLINK-2720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15896239#comment-15896239 ] ASF GitHub Bot commented on FLINK-2720: --- Github user RalphSu commented on the issue: https://github.com/apache/flink/pull/1157 ? > Add Storm-CountMetric in flink-stormcompatibility > - > > Key: FLINK-2720 > URL: https://issues.apache.org/jira/browse/FLINK-2720 > Project: Flink > Issue Type: New Feature > Components: Storm Compatibility >Reporter: Huang Wei >Assignee: Matthias J. Sax > Fix For: 1.0.0 > > Original Estimate: 168h > Remaining Estimate: 168h > > Add the CountMetric for the first step of storm metrics: > 1.Do a wrapper FlinkCountMetric for CountMetric > 2.push the RuntimeContext in FlinkTopologyContext to use `addAccumulator` > method for registering the metric. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #1157: [FLINK-2720][storm-compatibility]Add Storm-CountMetric fo...
Github user RalphSu commented on the issue: https://github.com/apache/flink/pull/1157 Here it looks user need to change code to make metrics work --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2720) Add Storm-CountMetric in flink-stormcompatibility
[ https://issues.apache.org/jira/browse/FLINK-2720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15896238#comment-15896238 ] ASF GitHub Bot commented on FLINK-2720: --- Github user RalphSu commented on the issue: https://github.com/apache/flink/pull/1157 Here it looks user need to change code to make metrics work > Add Storm-CountMetric in flink-stormcompatibility > - > > Key: FLINK-2720 > URL: https://issues.apache.org/jira/browse/FLINK-2720 > Project: Flink > Issue Type: New Feature > Components: Storm Compatibility >Reporter: Huang Wei >Assignee: Matthias J. Sax > Fix For: 1.0.0 > > Original Estimate: 168h > Remaining Estimate: 168h > > Add the CountMetric for the first step of storm metrics: > 1.Do a wrapper FlinkCountMetric for CountMetric > 2.push the RuntimeContext in FlinkTopologyContext to use `addAccumulator` > method for registering the metric. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-2720) Add Storm-CountMetric in flink-stormcompatibility
[ https://issues.apache.org/jira/browse/FLINK-2720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15896237#comment-15896237 ] ASF GitHub Bot commented on FLINK-2720: --- Github user RalphSu commented on the issue: https://github.com/apache/flink/pull/1157 Here it looks user need to change code to make metrics work > Add Storm-CountMetric in flink-stormcompatibility > - > > Key: FLINK-2720 > URL: https://issues.apache.org/jira/browse/FLINK-2720 > Project: Flink > Issue Type: New Feature > Components: Storm Compatibility >Reporter: Huang Wei >Assignee: Matthias J. Sax > Fix For: 1.0.0 > > Original Estimate: 168h > Remaining Estimate: 168h > > Add the CountMetric for the first step of storm metrics: > 1.Do a wrapper FlinkCountMetric for CountMetric > 2.push the RuntimeContext in FlinkTopologyContext to use `addAccumulator` > method for registering the metric. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #1157: [FLINK-2720][storm-compatibility]Add Storm-CountMetric fo...
Github user RalphSu commented on the issue: https://github.com/apache/flink/pull/1157 Here it looks user need to change code to make metrics work --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #1157: [FLINK-2720][storm-compatibility]Add Storm-CountMetric fo...
Github user RalphSu commented on the issue: https://github.com/apache/flink/pull/1157 Any progress on this one? Looks this has been stop for a while. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2720) Add Storm-CountMetric in flink-stormcompatibility
[ https://issues.apache.org/jira/browse/FLINK-2720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15896234#comment-15896234 ] ASF GitHub Bot commented on FLINK-2720: --- Github user RalphSu commented on the issue: https://github.com/apache/flink/pull/1157 Any progress on this one? Looks this has been stop for a while. > Add Storm-CountMetric in flink-stormcompatibility > - > > Key: FLINK-2720 > URL: https://issues.apache.org/jira/browse/FLINK-2720 > Project: Flink > Issue Type: New Feature > Components: Storm Compatibility >Reporter: Huang Wei >Assignee: Matthias J. Sax > Fix For: 1.0.0 > > Original Estimate: 168h > Remaining Estimate: 168h > > Add the CountMetric for the first step of storm metrics: > 1.Do a wrapper FlinkCountMetric for CountMetric > 2.push the RuntimeContext in FlinkTopologyContext to use `addAccumulator` > method for registering the metric. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-2720) Add Storm-CountMetric in flink-stormcompatibility
[ https://issues.apache.org/jira/browse/FLINK-2720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15896231#comment-15896231 ] ASF GitHub Bot commented on FLINK-2720: --- Github user RalphSu commented on a diff in the pull request: https://github.com/apache/flink/pull/1157#discussion_r104311095 --- Diff: flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContext.java --- @@ -120,7 +129,19 @@ public ReducedMetric registerMetric(final String name, final IReducer combiner, @SuppressWarnings("unchecked") @Override public IMetric registerMetric(final String name, final IMetric metric, final int timeBucketSizeInSecs) { - throw new UnsupportedOperationException("Metrics are not supported by Flink"); + + // TODO: There is no use for timeBucketSizeInSecs yet. + if (metric instanceof FlinkCountMetric) { --- End diff -- Here it looks user need to change code to make metrics work > Add Storm-CountMetric in flink-stormcompatibility > - > > Key: FLINK-2720 > URL: https://issues.apache.org/jira/browse/FLINK-2720 > Project: Flink > Issue Type: New Feature > Components: Storm Compatibility >Reporter: Huang Wei >Assignee: Matthias J. Sax > Fix For: 1.0.0 > > Original Estimate: 168h > Remaining Estimate: 168h > > Add the CountMetric for the first step of storm metrics: > 1.Do a wrapper FlinkCountMetric for CountMetric > 2.push the RuntimeContext in FlinkTopologyContext to use `addAccumulator` > method for registering the metric. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #1157: [FLINK-2720][storm-compatibility]Add Storm-CountMe...
Github user RalphSu commented on a diff in the pull request: https://github.com/apache/flink/pull/1157#discussion_r104311095 --- Diff: flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContext.java --- @@ -120,7 +129,19 @@ public ReducedMetric registerMetric(final String name, final IReducer combiner, @SuppressWarnings("unchecked") @Override public IMetric registerMetric(final String name, final IMetric metric, final int timeBucketSizeInSecs) { - throw new UnsupportedOperationException("Metrics are not supported by Flink"); + + // TODO: There is no use for timeBucketSizeInSecs yet. + if (metric instanceof FlinkCountMetric) { --- End diff -- Here it looks user need to change code to make metrics work --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #1157: [FLINK-2720][storm-compatibility]Add Storm-CountMe...
Github user RalphSu commented on a diff in the pull request: https://github.com/apache/flink/pull/1157#discussion_r104311065 --- Diff: flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContext.java --- @@ -120,7 +129,19 @@ public ReducedMetric registerMetric(final String name, final IReducer combiner, @SuppressWarnings("unchecked") @Override public IMetric registerMetric(final String name, final IMetric metric, final int timeBucketSizeInSecs) { - throw new UnsupportedOperationException("Metrics are not supported by Flink"); + + // TODO: There is no use for timeBucketSizeInSecs yet. + if (metric instanceof FlinkCountMetric) { --- End diff -- Here it looks user need to change code to make metrics work --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2720) Add Storm-CountMetric in flink-stormcompatibility
[ https://issues.apache.org/jira/browse/FLINK-2720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15896228#comment-15896228 ] ASF GitHub Bot commented on FLINK-2720: --- Github user RalphSu commented on a diff in the pull request: https://github.com/apache/flink/pull/1157#discussion_r104311065 --- Diff: flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContext.java --- @@ -120,7 +129,19 @@ public ReducedMetric registerMetric(final String name, final IReducer combiner, @SuppressWarnings("unchecked") @Override public IMetric registerMetric(final String name, final IMetric metric, final int timeBucketSizeInSecs) { - throw new UnsupportedOperationException("Metrics are not supported by Flink"); + + // TODO: There is no use for timeBucketSizeInSecs yet. + if (metric instanceof FlinkCountMetric) { --- End diff -- Here it looks user need to change code to make metrics work > Add Storm-CountMetric in flink-stormcompatibility > - > > Key: FLINK-2720 > URL: https://issues.apache.org/jira/browse/FLINK-2720 > Project: Flink > Issue Type: New Feature > Components: Storm Compatibility >Reporter: Huang Wei >Assignee: Matthias J. Sax > Fix For: 1.0.0 > > Original Estimate: 168h > Remaining Estimate: 168h > > Add the CountMetric for the first step of storm metrics: > 1.Do a wrapper FlinkCountMetric for CountMetric > 2.push the RuntimeContext in FlinkTopologyContext to use `addAccumulator` > method for registering the metric. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4816) Executions failed from "DEPLOYING" should retain restored checkpoint information
[ https://issues.apache.org/jira/browse/FLINK-4816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15896187#comment-15896187 ] Wei-Che Wei commented on FLINK-4816: Hi [~ram_krish] If you have been working on it, just keep going. I will go to find another task. Thank you. > Executions failed from "DEPLOYING" should retain restored checkpoint > information > > > Key: FLINK-4816 > URL: https://issues.apache.org/jira/browse/FLINK-4816 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Reporter: Stephan Ewen > > When an execution fails from state {{DEPLOYING}}, it should wrap the failure > to better report the failure cause: > - If no checkpoint was restored, it should wrap the exception in a > {{DeployTaskException}} > - If a checkpoint was restored, it should wrap the exception in a > {{RestoreTaskException}} and record the id of the checkpoint that was > attempted to be restored. -- This message was sent by Atlassian JIRA (v6.3.15#6346)