[jira] [Assigned] (FLINK-4817) Checkpoint Coordinator should be called to restore state with a specific checkpoint ID

2017-03-05 Thread Wei-Che Wei (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei-Che Wei reassigned FLINK-4817:
--

Assignee: (was: Wei-Che Wei)

> Checkpoint Coordinator should be called to restore state with a specific 
> checkpoint ID
> --
>
> Key: FLINK-4817
> URL: https://issues.apache.org/jira/browse/FLINK-4817
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>
> Rather than being called to restore the "latest" checkpoint, the Checkpoint 
> Coordinator should be called to restore a specific checkpoint.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4816) Executions failed from "DEPLOYING" should retain restored checkpoint information

2017-03-05 Thread ramkrishna.s.vasudevan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15896781#comment-15896781
 ] 

ramkrishna.s.vasudevan commented on FLINK-4816:
---

[~tonywei]
I would like to continue with this. Infact was waiting for Stephan's feedback. 
In a day or two I will submit a PR. Thanks.

> Executions failed from "DEPLOYING" should retain restored checkpoint 
> information
> 
>
> Key: FLINK-4816
> URL: https://issues.apache.org/jira/browse/FLINK-4816
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Stephan Ewen
>
> When an execution fails from state {{DEPLOYING}}, it should wrap the failure 
> to better report the failure cause:
>   - If no checkpoint was restored, it should wrap the exception in a 
> {{DeployTaskException}}
>   - If a checkpoint was restored, it should wrap the exception in a 
> {{RestoreTaskException}} and record the id of the checkpoint that was 
> attempted to be restored.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-5957) Remove `getAccumulatorType` method from build-in `AggregateFunction`

2017-03-05 Thread sunjincheng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15896761#comment-15896761
 ] 

sunjincheng edited comment on FLINK-5957 at 3/6/17 5:32 AM:


Hi, [~fhueske], I have two ideas here:
First of all:

In fact I do not like to use generics in build-in Accumulator, such as:
{code}
Class SumAccumulator [T] extends JTuple2 [T, Boolean] with Accumulator
{code}
I think we should remove these generic definitions.

Secondly:
We can define the Accumulator of the build-in aggregateFunction as a POJO type, 
which is more concise. 
e.g.:
{code}
class IntSumAccumulator(var f0: Int, var f1:Boolean) extends Accumulator{
  def this(){
this(0, false)
  }
}
{code}
IMO. If I did the above two points, build-in the aggregateFunction we do not 
have to provide [[getAccumulatorType]] method, TypeInformation.of () works well.


The build-in aggregateFunction is an example of a user-defined 
aggregateFunction, so the implementation of the build-in aggregateFunction 
directly affects the user's implementation.

So, IMO. I suggest to do above changes. What do you think ?
Best,
SunJincheng


was (Author: sunjincheng121):
Hi, [~fhueske], I have two ideas here:
First of all:

In fact I do not like to use generics in build-in Accumulator, such as:
{code}
Class SumAccumulator [T] extends JTuple2 [T, Boolean] with Accumulator
{code}
I think we should remove these generic definitions.

Secondly:
We can define the Accumulator of the build-in aggregateFunction as a POJO type, 
which is more concise.

IMO. If I did the above two points, build-in the aggregateFunction we do not 
have to provide [[getAccumulatorType]] method, TypeInformation.of () works well.

The build-in aggregateFunction is an example of a user-defined 
aggregateFunction, so the implementation of the build-in aggregateFunction 
directly affects the user's implementation.

So, IMO. I suggest to do above changes. What do you think ?
Best,
SunJincheng

> Remove  `getAccumulatorType` method from build-in `AggregateFunction`
> -
>
> Key: FLINK-5957
> URL: https://issues.apache.org/jira/browse/FLINK-5957
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Build-in aggregateFunction need not implement the  `getAccumulatorType` 
> method. 
> We can get TypeInformation by  `TypeInformation.of() ` or 
> `TypeInformation.of(new TypeHint[AGG.type](){})`. 
> What do you think? [~fhueske] [~shaoxuan] 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5957) Remove `getAccumulatorType` method from build-in `AggregateFunction`

2017-03-05 Thread sunjincheng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15896761#comment-15896761
 ] 

sunjincheng commented on FLINK-5957:


Hi, [~fhueske], I have two ideas here:
First of all:

In fact I do not like to use generics in build-in Accumulator, such as:
{code}
Class SumAccumulator [T] extends JTuple2 [T, Boolean] with Accumulator
{code}
I think we should remove these generic definitions.

Secondly:
We can define the Accumulator of the build-in aggregateFunction as a POJO type, 
which is more concise.

IMO. If I did the above two points, build-in the aggregateFunction we do not 
have to provide [[getAccumulatorType]] method, TypeInformation.of () works well.

The build-in aggregateFunction is an example of a user-defined 
aggregateFunction, so the implementation of the build-in aggregateFunction 
directly affects the user's implementation.

So, IMO. I suggest to do above changes. What do you think ?
Best,
SunJincheng

> Remove  `getAccumulatorType` method from build-in `AggregateFunction`
> -
>
> Key: FLINK-5957
> URL: https://issues.apache.org/jira/browse/FLINK-5957
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Build-in aggregateFunction need not implement the  `getAccumulatorType` 
> method. 
> We can get TypeInformation by  `TypeInformation.of() ` or 
> `TypeInformation.of(new TypeHint[AGG.type](){})`. 
> What do you think? [~fhueske] [~shaoxuan] 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

2017-03-05 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3406#discussion_r104343965
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala
 ---
@@ -68,6 +62,90 @@ object CommonTestData {
 )
   }
 
+  def getMockedFlinkExternalCatalog: ExternalCatalog = {
+val csvRecord1 = Seq(
+  "1#1#Hi",
+  "2#2#Hello",
+  "3#2#Hello world"
+)
+val tempFilePath1 = writeToTempFile(csvRecord1.mkString("$"), 
"csv-test1", "tmp")
+val externalCatalogTable1 = ExternalCatalogTable(
+  TableIdentifier("db1", "tb1"),
+  "csv",
+  DataSchema(
+Array(
+  BasicTypeInfo.INT_TYPE_INFO,
+  BasicTypeInfo.LONG_TYPE_INFO,
+  BasicTypeInfo.STRING_TYPE_INFO),
+Array("a", "b", "c")
+  ),
+  properties = Map(
+"path" -> tempFilePath1,
+"fieldDelim" -> "#",
+"rowDelim" -> "$"
+  )
+)
+
+val csvRecord2 = Seq(
+  "1#1#0#Hallo#1",
+  "2#2#1#Hallo Welt#2",
+  "2#3#2#Hallo Welt wie#1",
+  "3#4#3#Hallo Welt wie gehts?#2",
+  "3#5#4#ABC#2",
+  "3#6#5#BCD#3",
+  "4#7#6#CDE#2",
+  "4#8#7#DEF#1",
+  "4#9#8#EFG#1",
+  "4#10#9#FGH#2",
+  "5#11#10#GHI#1",
+  "5#12#11#HIJ#3",
+  "5#13#12#IJK#3",
+  "5#14#13#JKL#2",
+  "5#15#14#KLM#2"
+)
+val tempFilePath2 = writeToTempFile(csvRecord2.mkString("$"), 
"csv-test2", "tmp")
+val externalCatalogTable2 = ExternalCatalogTable(
+  TableIdentifier("db2", "tb2"),
+  "csv",
+  DataSchema(
+Array(
+  BasicTypeInfo.INT_TYPE_INFO,
+  BasicTypeInfo.LONG_TYPE_INFO,
+  BasicTypeInfo.INT_TYPE_INFO,
+  BasicTypeInfo.STRING_TYPE_INFO,
+  BasicTypeInfo.LONG_TYPE_INFO),
+Array("d", "e", "f", "g", "h")
+  ),
+  properties = Map(
+"path" -> tempFilePath2,
+"fieldDelim" -> "#",
+"rowDelim" -> "$"
+  )
+)
+val catalog = mock(classOf[ExternalCatalog])
--- End diff --

Good idea


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

2017-03-05 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3406#discussion_r104343679
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala
 ---
@@ -68,6 +62,90 @@ object CommonTestData {
 )
   }
 
+  def getMockedFlinkExternalCatalog: ExternalCatalog = {
+val csvRecord1 = Seq(
--- End diff --

val csvRecord1 = Seq() is just csv data, maybe it's better to name it as 
csvDataRecords?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-5957) Remove `getAccumulatorType` method from build-in `AggregateFunction`

2017-03-05 Thread sunjincheng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng updated FLINK-5957:
---
Summary: Remove  `getAccumulatorType` method from build-in 
`AggregateFunction`  (was: Remove  `getAccumulatorType` method from 
`AggregateFunction`)

> Remove  `getAccumulatorType` method from build-in `AggregateFunction`
> -
>
> Key: FLINK-5957
> URL: https://issues.apache.org/jira/browse/FLINK-5957
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Build-in aggregateFunction need not implement the  `getAccumulatorType` 
> method. 
> We can get TypeInformation by  `TypeInformation.of() ` or 
> `TypeInformation.of(new TypeHint[AGG.type](){})`. 
> What do you think? [~fhueske] [~shaoxuan] 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-2720) Add Storm-CountMetric in flink-stormcompatibility

2017-03-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15896722#comment-15896722
 ] 

ASF GitHub Bot commented on FLINK-2720:
---

Github user RalphSu commented on the issue:

https://github.com/apache/flink/pull/1157
  
okey, would try file a PR on this soon


> Add Storm-CountMetric in flink-stormcompatibility
> -
>
> Key: FLINK-2720
> URL: https://issues.apache.org/jira/browse/FLINK-2720
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: Huang Wei
>Assignee: Matthias J. Sax
> Fix For: 1.0.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Add the CountMetric for the first step of storm metrics:
> 1.Do a wrapper FlinkCountMetric for CountMetric
> 2.push the RuntimeContext in FlinkTopologyContext to use `addAccumulator` 
> method for registering the metric.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #1157: [FLINK-2720][storm-compatibility]Add Storm-CountMetric fo...

2017-03-05 Thread RalphSu
Github user RalphSu commented on the issue:

https://github.com/apache/flink/pull/1157
  
okey, would try file a PR on this soon


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Comment Edited] (FLINK-4848) keystoreFilePath should be checked against null in SSLUtils#createSSLServerContext

2017-03-05 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15582930#comment-15582930
 ] 

Ted Yu edited comment on FLINK-4848 at 3/6/17 3:57 AM:
---

There is similar issue with trustStoreFilePath:

{code}
trustStoreFile = new FileInputStream(new File(trustStoreFilePath));
{code}


was (Author: yuzhih...@gmail.com):
There is similar issue with trustStoreFilePath:
{code}
trustStoreFile = new FileInputStream(new File(trustStoreFilePath));
{code}

> keystoreFilePath should be checked against null in 
> SSLUtils#createSSLServerContext
> --
>
> Key: FLINK-4848
> URL: https://issues.apache.org/jira/browse/FLINK-4848
> Project: Flink
>  Issue Type: Bug
>  Components: Security
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
>   String keystoreFilePath = sslConfig.getString(
> ConfigConstants.SECURITY_SSL_KEYSTORE,
> null);
> ...
>   try {
> keyStoreFile = new FileInputStream(new File(keystoreFilePath));
> {code}
> If keystoreFilePath is null, the File ctor would throw NPE.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

2017-03-05 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3406#discussion_r104340593
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
 ---
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog
+
+import java.util
+
+import org.apache.calcite.linq4j.tree.Expression
+import org.apache.calcite.schema._
+import org.apache.flink.table.api.{DatabaseNotExistException, 
TableNotExistException}
+import org.slf4j.{Logger, LoggerFactory}
+
+import scala.collection.JavaConverters._
+
+/**
+  * This class is responsible for connect external catalog to calcite 
catalog.
+  * In this way, it is possible to look-up and access tables in SQL queries
+  * without registering tables in advance.
+  * The databases in the external catalog registers as calcite sub-Schemas 
of current schema.
+  * The tables in a given database registers as calcite tables
+  * of the [[ExternalCatalogDatabaseSchema]].
+  *
+  * @param catalogIdentifier external catalog name
+  * @param catalog   external catalog
+  */
+class ExternalCatalogSchema(
+catalogIdentifier: String,
+catalog: ExternalCatalog) extends Schema {
+
+  private val LOG: Logger = LoggerFactory.getLogger(this.getClass)
+
+  /**
+* Looks up database by the given sub-schema name in the external 
catalog,
+* returns it Wrapped in a [[ExternalCatalogDatabaseSchema]] with the 
given database name.
+*
+* @param name Sub-schema name
+* @return Sub-schema with a given name, or null
+*/
+  override def getSubSchema(name: String): Schema = {
+try {
+  val db = catalog.getDatabase(name)
+  if (db != null) {
+new ExternalCatalogDatabaseSchema(db.dbName, catalog)
+  } else {
+null
+  }
+} catch {
+  case e: DatabaseNotExistException =>
+LOG.warn(s"database $name does not exist in externalCatalog 
$catalogIdentifier")
+null
+}
+  }
+
+  /**
+* Lists the databases of the external catalog,
+* returns the lists as the names of this schema's sub-schemas.
+*
+* @return names of this schema's child schemas
+*/
+  override def getSubSchemaNames: util.Set[String] = 
catalog.listDatabases().toSet.asJava
+
+  override def getTable(name: String): Table = null
+
+  override def isMutable: Boolean = true
+
+  override def getFunctions(name: String): util.Collection[Function] =
+util.Collections.emptyList[Function]
+
+  override def getExpression(parentSchema: SchemaPlus, name: String): 
Expression =
+Schemas.subSchemaExpression(parentSchema, name, getClass)
+
+  override def getFunctionNames: util.Set[String] = 
util.Collections.emptySet[String]
+
+  override def getTableNames: util.Set[String] = 
util.Collections.emptySet[String]
+
+  override def contentsHaveChangedSince(lastCheck: Long, now: Long): 
Boolean = false
--- End diff --

set contentsHaveChangedSince to false because We want to fetch the latest 
informations from the underlying ExternalCatalog instead of using calcite 
caches. Because tables list and databases list of ExternalCatalog may changed 
anytime. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-5902) Some images can not show in IE

2017-03-05 Thread shijinkui (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shijinkui updated FLINK-5902:
-
Issue Type: Sub-task  (was: Bug)
Parent: FLINK-5839

> Some images can not show in IE
> --
>
> Key: FLINK-5902
> URL: https://issues.apache.org/jira/browse/FLINK-5902
> Project: Flink
>  Issue Type: Sub-task
>  Components: Webfrontend
> Environment: IE
>Reporter: Tao Wang
> Attachments: chrome is ok.png, IE 11 with problem.png
>
>
> Some images in the Overview page can not show in IE, as it is good in Chrome.
> I'm using IE 11, but think same with IE9&IE10. I'll paste the screenshot 
> later.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1589#comment-1589
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user wenlong88 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3438#discussion_r104334699
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java
 ---
@@ -19,30 +19,35 @@
 package org.apache.flink.streaming.api.functions;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.streaming.api.TimeDomain;
 import org.apache.flink.streaming.api.TimerService;
 import org.apache.flink.util.Collector;
 
 /**
  * A function that processes elements of a stream.
  *
- * The function will be called for every element in the input stream 
and can produce
- * zero or more output. The function can also query the time and set 
timers. When
- * reacting to the firing of set timers the function can emit yet more 
elements.
+ * For every element in the input stream {@link #processElement(Object, 
Context, Collector)}
+ * is invoked. This can produce zero or more elements as output. 
Implementations can also
+ * query the time and set timers through the provided {@link Context}. For 
firing timers
+ * {@link #onTimer(long, OnTimerContext, Collector)} will be invoked. This 
can again produce
+ * zero or more elements as output and register further timers.
  *
- * The function will be called for every element in the input stream 
and can produce
- * zero or more output elements. Contrary to the
- * {@link org.apache.flink.api.common.functions.FlatMapFunction}, this 
function can also query
- * the time (both event and processing) and set timers, through the 
provided {@link Context}.
- * When reacting to the firing of set timers the function can directly 
emit a result, and/or
- * register a timer that will trigger an action in the future.
+ * NOTE: Access to keyed state and timers (which are also scoped 
to a key) is only
+ * available if the {@code ProcessFunction} is applied on a {@code 
KeyedStream}.
+ *
+ * NOTE: A {@code ProcessFunction} is always a
+ * {@link org.apache.flink.api.common.functions.RichFunction}. Therefore, 
access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} as always 
available and setup and
+ * teardown methods can be implemented. See
+ * {@link 
org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)}
+ * and {@link org.apache.flink.api.common.functions.RichFunction#close()}.
  *
  * @param  Type of the input elements.
  * @param  Type of the output elements.
  */
 @PublicEvolving
-public interface ProcessFunction extends Function {
+public abstract class ProcessFunction extends AbstractRichFunction {
--- End diff --

hi, changing form `interface` to `class` is incompatible on the user side. 
Can't ProcessFunction just  extend RichFunction?


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3438: [FLINK-4460] Allow ProcessFunction on non-keyed st...

2017-03-05 Thread wenlong88
Github user wenlong88 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3438#discussion_r104334699
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java
 ---
@@ -19,30 +19,35 @@
 package org.apache.flink.streaming.api.functions;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.streaming.api.TimeDomain;
 import org.apache.flink.streaming.api.TimerService;
 import org.apache.flink.util.Collector;
 
 /**
  * A function that processes elements of a stream.
  *
- * The function will be called for every element in the input stream 
and can produce
- * zero or more output. The function can also query the time and set 
timers. When
- * reacting to the firing of set timers the function can emit yet more 
elements.
+ * For every element in the input stream {@link #processElement(Object, 
Context, Collector)}
+ * is invoked. This can produce zero or more elements as output. 
Implementations can also
+ * query the time and set timers through the provided {@link Context}. For 
firing timers
+ * {@link #onTimer(long, OnTimerContext, Collector)} will be invoked. This 
can again produce
+ * zero or more elements as output and register further timers.
  *
- * The function will be called for every element in the input stream 
and can produce
- * zero or more output elements. Contrary to the
- * {@link org.apache.flink.api.common.functions.FlatMapFunction}, this 
function can also query
- * the time (both event and processing) and set timers, through the 
provided {@link Context}.
- * When reacting to the firing of set timers the function can directly 
emit a result, and/or
- * register a timer that will trigger an action in the future.
+ * NOTE: Access to keyed state and timers (which are also scoped 
to a key) is only
+ * available if the {@code ProcessFunction} is applied on a {@code 
KeyedStream}.
+ *
+ * NOTE: A {@code ProcessFunction} is always a
+ * {@link org.apache.flink.api.common.functions.RichFunction}. Therefore, 
access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} as always 
available and setup and
+ * teardown methods can be implemented. See
+ * {@link 
org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)}
+ * and {@link org.apache.flink.api.common.functions.RichFunction#close()}.
  *
  * @param  Type of the input elements.
  * @param  Type of the output elements.
  */
 @PublicEvolving
-public interface ProcessFunction extends Function {
+public abstract class ProcessFunction extends AbstractRichFunction {
--- End diff --

hi, changing form `interface` to `class` is incompatible on the user side. 
Can't ProcessFunction just  extend RichFunction?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

2017-03-05 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3406#discussion_r104333889
  
--- Diff: flink-libraries/flink-table/pom.xml ---
@@ -92,7 +92,12 @@ under the License.


 
-
+   
--- End diff --

Yes, this problem bothers me, too. the license of reflections jar is  
WTFPL, does that means we would do anything, how to check whether it is 
compatible with AL2? I noticed that the reflections jar is already referenced 
in  flink-parent.pom, but only for tests, and would not be included in any 
flink jars. My original thought is not only use this jar, but also include it 
(like calcit) in the flink-table-{version}.jar. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4364) Implement TaskManager side of heartbeat from JobManager

2017-03-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15896637#comment-15896637
 ] 

ASF GitHub Bot commented on FLINK-4364:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/3151
  
Hi @tillrohrmann , I have submitted the modifications as you suggest one 
week ago. Have you received it and any other issues?


> Implement TaskManager side of heartbeat from JobManager
> ---
>
> Key: FLINK-4364
> URL: https://issues.apache.org/jira/browse/FLINK-4364
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: zhijiang
>Assignee: zhijiang
>
> The {{JobManager}} initiates heartbeat messages via (JobID, JmLeaderID), and 
> the {{TaskManager}} will report metrics info for each heartbeat.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5966) Document Running Flink on Kubernetes

2017-03-05 Thread StephenWithPH (JIRA)
StephenWithPH created FLINK-5966:


 Summary: Document Running Flink on Kubernetes
 Key: FLINK-5966
 URL: https://issues.apache.org/jira/browse/FLINK-5966
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.2.0, 1.3.0
Reporter: StephenWithPH
Priority: Minor


There are several good sources of information regarding running prior versions 
of Flink in Kubernetes. I was able to follow those and fill in the gaps to get 
Flink 1.2 up in K8s.

I plan to document my steps in detail in order to submit a PR. There are 
several existing PRs that may improve how Flink runs in containers. (See 
[FLINK-5635 Improve Docker tooling|https://github.com/apache/flink/pull/3205] 
and [FLINK-5634 Flink should not always redirect 
stdout|https://github.com/apache/flink/pull/3204])

Depending on the timing of those PRs, I may tailor my docs towards Flink 1.3 in 
order to reflect those changes.

I'm opening this JIRA issue to begin the process.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3151: [FLINK-4364] [runtime] [FLIP-6] Implement TaskManager sid...

2017-03-05 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/3151
  
Hi @tillrohrmann , I have submitted the modifications as you suggest one 
week ago. Have you received it and any other issues?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #1157: [FLINK-2720][storm-compatibility]Add Storm-CountMetric fo...

2017-03-05 Thread mjsax
Github user mjsax commented on the issue:

https://github.com/apache/flink/pull/1157
  
@HuangWHWHW Are you still working on this? @RalphSu Are you interested in 
talking this over?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2720) Add Storm-CountMetric in flink-stormcompatibility

2017-03-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15896579#comment-15896579
 ] 

ASF GitHub Bot commented on FLINK-2720:
---

Github user mjsax commented on the issue:

https://github.com/apache/flink/pull/1157
  
@HuangWHWHW Are you still working on this? @RalphSu Are you interested in 
talking this over?


> Add Storm-CountMetric in flink-stormcompatibility
> -
>
> Key: FLINK-2720
> URL: https://issues.apache.org/jira/browse/FLINK-2720
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: Huang Wei
>Assignee: Matthias J. Sax
> Fix For: 1.0.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Add the CountMetric for the first step of storm metrics:
> 1.Do a wrapper FlinkCountMetric for CountMetric
> 2.push the RuntimeContext in FlinkTopologyContext to use `addAccumulator` 
> method for registering the metric.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-4534) Lack of synchronization in BucketingSink#restoreState()

2017-03-05 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-4534:
--
Description: 
Iteration over state.bucketStates is protected by synchronization in other 
methods, except for the following in restoreState():
{code}
for (BucketState bucketState : state.bucketStates.values()) {
{code}
and following in close():
{code}
for (Map.Entry> entry : 
state.bucketStates.entrySet()) {
  closeCurrentPartFile(entry.getValue());
{code}
w.r.t. bucketState.pendingFilesPerCheckpoint , there is similar issue starting 
line 752:
{code}
  Set pastCheckpointIds = 
bucketState.pendingFilesPerCheckpoint.keySet();
  LOG.debug("Moving pending files to final location on restore.");
  for (Long pastCheckpointId : pastCheckpointIds) {
{code}

  was:
Iteration over state.bucketStates is protected by synchronization in other 
methods, except for the following in restoreState():

{code}
for (BucketState bucketState : state.bucketStates.values()) {
{code}
and following in close():
{code}
for (Map.Entry> entry : 
state.bucketStates.entrySet()) {
  closeCurrentPartFile(entry.getValue());
{code}
w.r.t. bucketState.pendingFilesPerCheckpoint , there is similar issue starting 
line 752:
{code}
  Set pastCheckpointIds = 
bucketState.pendingFilesPerCheckpoint.keySet();
  LOG.debug("Moving pending files to final location on restore.");
  for (Long pastCheckpointId : pastCheckpointIds) {
{code}


> Lack of synchronization in BucketingSink#restoreState()
> ---
>
> Key: FLINK-4534
> URL: https://issues.apache.org/jira/browse/FLINK-4534
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Ted Yu
>
> Iteration over state.bucketStates is protected by synchronization in other 
> methods, except for the following in restoreState():
> {code}
> for (BucketState bucketState : state.bucketStates.values()) {
> {code}
> and following in close():
> {code}
> for (Map.Entry> entry : 
> state.bucketStates.entrySet()) {
>   closeCurrentPartFile(entry.getValue());
> {code}
> w.r.t. bucketState.pendingFilesPerCheckpoint , there is similar issue 
> starting line 752:
> {code}
>   Set pastCheckpointIds = 
> bucketState.pendingFilesPerCheckpoint.keySet();
>   LOG.debug("Moving pending files to final location on restore.");
>   for (Long pastCheckpointId : pastCheckpointIds) {
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3475: [FLINK-5965] Typo on DropWizard wrappers

2017-03-05 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3475
  
+1 to merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5965) Typo on DropWizard wrappers

2017-03-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15896491#comment-15896491
 ] 

ASF GitHub Bot commented on FLINK-5965:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3475
  
+1 to merge.


> Typo on DropWizard wrappers
> ---
>
> Key: FLINK-5965
> URL: https://issues.apache.org/jira/browse/FLINK-5965
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Francisco Sokol
>Priority: Trivial
>
> The metrics page, in the monitoring section, has two small typos in the 
> example code:
> - `DropWizardHistogramWrapper` should be  `DropwizardHistogramWrapper`
> - `DropWizardMeterWrapper` should be `DropwizardMeterWrapper`



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5909) Interface for GraphAlgorithm results

2017-03-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15896265#comment-15896265
 ] 

ASF GitHub Bot commented on FLINK-5909:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/3434
  
Thanks for the review @vasia. The Unary/Binary/TertiaryResult interfaces 
will allow follow-on algorithms to work with POJOs or non-conforming Tuples 
(where we don't assume source and target vertices are fields 0 and 1). A common 
AnalyticResult interface is helpful with drivers that store both directed and 
undirected results.

The improvements to the library methods docs do reflect an expectation of 
this PR.


> Interface for GraphAlgorithm results
> 
>
> Key: FLINK-5909
> URL: https://issues.apache.org/jira/browse/FLINK-5909
> Project: Flink
>  Issue Type: Sub-task
>  Components: Gelly
>Affects Versions: 1.3.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.3.0
>
>
> Create {{AlgorithmResult}} and {{AnalyticResult}} interfaces for library 
> algorithms to implement. This flattens algorithm results to a single tuple.
> Also create interfaces for {{UnaryResult}}, {{BinaryResult}}, and 
> {{TertiaryResult}} implementing methods to access the 0th, 1st, and 2nd 
> vertices.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3434: [FLINK-5909] [gelly] Interface for GraphAlgorithm results

2017-03-05 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/3434
  
Thanks for the review @vasia. The Unary/Binary/TertiaryResult interfaces 
will allow follow-on algorithms to work with POJOs or non-conforming Tuples 
(where we don't assume source and target vertices are fields 0 and 1). A common 
AnalyticResult interface is helpful with drivers that store both directed and 
undirected results.

The improvements to the library methods docs do reflect an expectation of 
this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3679) Allow Kafka consumer to skip corrupted messages

2017-03-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15896252#comment-15896252
 ] 

ASF GitHub Bot commented on FLINK-3679:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r104312079
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
 ---
@@ -419,6 +424,164 @@ public void run() {
assertFalse("fetcher threads did not properly finish", 
sourceContext.isStillBlocking());
}
 
+   @Test
+   public void testSkipCorruptedMessage() throws Exception {
+
+   // - some test data -
+
+   final String topic = "test-topic";
+   final int partition = 3;
+   final byte[] payload = new byte[] {1, 2, 3, 4};
+
+   final List> records = 
Arrays.asList(
+   new ConsumerRecord<>(topic, partition, 15, payload, 
payload),
+   new ConsumerRecord<>(topic, partition, 16, payload, 
payload),
+   new ConsumerRecord<>(topic, partition, 17, payload, 
"end".getBytes()));
+
+   final Map>> 
data = new HashMap<>();
+   data.put(new TopicPartition(topic, partition), records);
+
+   final ConsumerRecords consumerRecords = new 
ConsumerRecords<>(data);
+
+   // - the test consumer -
+
+   final KafkaConsumer mockConsumer = 
mock(KafkaConsumer.class);
+   when(mockConsumer.poll(anyLong())).thenAnswer(new 
Answer>() {
+   @Override
+   public ConsumerRecords answer(InvocationOnMock 
invocation) {
+   return consumerRecords;
+   }
+   });
+
+   
whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
+
+   // - build a fetcher -
+
+   ArrayList results = new ArrayList<>();
+   SourceContext sourceContext = new 
CollectingSourceContext<>(results, results);
+   Map partitionsWithInitialOffsets =
+   Collections.singletonMap(new KafkaTopicPartition(topic, 
partition), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
+   KeyedDeserializationSchema schema = new 
KeyedDeserializationSchema() {
+
+   @Override
+   public String deserialize(byte[] messageKey, byte[] 
message,
+ 
String topic, int partition, long offset) throws IOException {
+   return offset == 15 ? null : new 
String(message);
+   }
+
+   @Override
+   public boolean isEndOfStream(String nextElement) {
+   return "end".equals(nextElement);
+   }
+
+   @Override
+   public TypeInformation getProducedType() {
+   return BasicTypeInfo.STRING_TYPE_INFO;
+   }
+   };
+
+   final Kafka09Fetcher fetcher = new Kafka09Fetcher<>(
+   sourceContext,
+   partitionsWithInitialOffsets,
+   null, /* periodic watermark extractor */
+   null, /* punctuated watermark extractor */
+   new TestProcessingTimeService(),
+   10, /* watermark interval */
+   this.getClass().getClassLoader(),
+   true, /* checkpointing */
+   "task_name",
+   new UnregisteredMetricsGroup(),
+   schema,
+   new Properties(),
+   0L,
+   false);
+
+
+   // - run the fetcher -
+
+   fetcher.runFetchLoop();
+   assertEquals(1, results.size());
+   }
+
+   @Test
+   public void testNullAsEOF() throws Exception {
--- End diff --

I'm not sure if this test is necessary. It's essentially just testing that 
`isEndOfStream` works when `isEndOfStream` is `true`. Whether or not the 
condition is `element == null` seems irrelevant to what's been tested.

We also already have a `runEndOfStreamTest` in `KafkaConsumerTestBase`.




> Allow Kafka consumer to skip corrupted messages
> ---
>
> Key: FLINK-3679
> URL: https://issues.apache.org/jira/browse/FLINK-3679
> Project: Flink
>  Issue Type: Bug
>  Components: DataSt

[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

2017-03-05 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r104311996
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
 ---
@@ -381,6 +381,10 @@ else if (partitionsRemoved) {

partitionsIterator.remove();
continue 
partitionsLoop;
}
+
+   if (value == null) {
+   continue;
+   }
--- End diff --

Would it make sense to do the `null` checking inside `emitRecord(...)`?
Otherwise, we wouldn't be updating the state for skipped records, and 
therefore not accounting it as "already processed".

I don't think it really matters, since we aren't outputting anything 
anyway, but I see at least one minor advantage that might deserve changing it: 
If we fail during a series of continuous skipped records, we won't be wasting 
any overhead re-processing them on restore.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3679) Allow Kafka consumer to skip corrupted messages

2017-03-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15896253#comment-15896253
 ] 

ASF GitHub Bot commented on FLINK-3679:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r104311996
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
 ---
@@ -381,6 +381,10 @@ else if (partitionsRemoved) {

partitionsIterator.remove();
continue 
partitionsLoop;
}
+
+   if (value == null) {
+   continue;
+   }
--- End diff --

Would it make sense to do the `null` checking inside `emitRecord(...)`?
Otherwise, we wouldn't be updating the state for skipped records, and 
therefore not accounting it as "already processed".

I don't think it really matters, since we aren't outputting anything 
anyway, but I see at least one minor advantage that might deserve changing it: 
If we fail during a series of continuous skipped records, we won't be wasting 
any overhead re-processing them on restore.


> Allow Kafka consumer to skip corrupted messages
> ---
>
> Key: FLINK-3679
> URL: https://issues.apache.org/jira/browse/FLINK-3679
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Kafka Connector
>Reporter: Jamie Grier
>Assignee: Haohui Mai
>
> There are a couple of issues with the DeserializationSchema API that I think 
> should be improved.  This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one 
> mapping between input and outputs.  In reality there are scenarios where one 
> input message (say from Kafka) might actually map to zero or more logical 
> elements in the pipeline.
> Particularly important here is the case where you receive a message from a 
> source (such as Kafka) and say the raw bytes don't deserialize properly.  
> Right now the only recourse is to throw IOException and therefore fail the 
> job.  
> This is definitely not good since bad data is a reality and failing the job 
> is not the right option.  If the job fails we'll just end up replaying the 
> bad data and the whole thing will start again.
> Instead in this case it would be best if the user could just return the empty 
> set.
> The other case is where one input message should logically be multiple output 
> messages.  This case is probably less important since there are other ways to 
> do this but in general it might be good to make the 
> DeserializationSchema.deserialize() method return a collection rather than a 
> single element.
> Maybe we need to support a DeserializationSchema variant that has semantics 
> more like that of FlatMap.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

2017-03-05 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r104312079
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
 ---
@@ -419,6 +424,164 @@ public void run() {
assertFalse("fetcher threads did not properly finish", 
sourceContext.isStillBlocking());
}
 
+   @Test
+   public void testSkipCorruptedMessage() throws Exception {
+
+   // - some test data -
+
+   final String topic = "test-topic";
+   final int partition = 3;
+   final byte[] payload = new byte[] {1, 2, 3, 4};
+
+   final List> records = 
Arrays.asList(
+   new ConsumerRecord<>(topic, partition, 15, payload, 
payload),
+   new ConsumerRecord<>(topic, partition, 16, payload, 
payload),
+   new ConsumerRecord<>(topic, partition, 17, payload, 
"end".getBytes()));
+
+   final Map>> 
data = new HashMap<>();
+   data.put(new TopicPartition(topic, partition), records);
+
+   final ConsumerRecords consumerRecords = new 
ConsumerRecords<>(data);
+
+   // - the test consumer -
+
+   final KafkaConsumer mockConsumer = 
mock(KafkaConsumer.class);
+   when(mockConsumer.poll(anyLong())).thenAnswer(new 
Answer>() {
+   @Override
+   public ConsumerRecords answer(InvocationOnMock 
invocation) {
+   return consumerRecords;
+   }
+   });
+
+   
whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
+
+   // - build a fetcher -
+
+   ArrayList results = new ArrayList<>();
+   SourceContext sourceContext = new 
CollectingSourceContext<>(results, results);
+   Map partitionsWithInitialOffsets =
+   Collections.singletonMap(new KafkaTopicPartition(topic, 
partition), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
+   KeyedDeserializationSchema schema = new 
KeyedDeserializationSchema() {
+
+   @Override
+   public String deserialize(byte[] messageKey, byte[] 
message,
+ 
String topic, int partition, long offset) throws IOException {
+   return offset == 15 ? null : new 
String(message);
+   }
+
+   @Override
+   public boolean isEndOfStream(String nextElement) {
+   return "end".equals(nextElement);
+   }
+
+   @Override
+   public TypeInformation getProducedType() {
+   return BasicTypeInfo.STRING_TYPE_INFO;
+   }
+   };
+
+   final Kafka09Fetcher fetcher = new Kafka09Fetcher<>(
+   sourceContext,
+   partitionsWithInitialOffsets,
+   null, /* periodic watermark extractor */
+   null, /* punctuated watermark extractor */
+   new TestProcessingTimeService(),
+   10, /* watermark interval */
+   this.getClass().getClassLoader(),
+   true, /* checkpointing */
+   "task_name",
+   new UnregisteredMetricsGroup(),
+   schema,
+   new Properties(),
+   0L,
+   false);
+
+
+   // - run the fetcher -
+
+   fetcher.runFetchLoop();
+   assertEquals(1, results.size());
+   }
+
+   @Test
+   public void testNullAsEOF() throws Exception {
--- End diff --

I'm not sure if this test is necessary. It's essentially just testing that 
`isEndOfStream` works when `isEndOfStream` is `true`. Whether or not the 
condition is `element == null` seems irrelevant to what's been tested.

We also already have a `runEndOfStreamTest` in `KafkaConsumerTestBase`.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #1157: [FLINK-2720][storm-compatibility]Add Storm-CountMetric fo...

2017-03-05 Thread RalphSu
Github user RalphSu commented on the issue:

https://github.com/apache/flink/pull/1157
  
?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2720) Add Storm-CountMetric in flink-stormcompatibility

2017-03-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15896239#comment-15896239
 ] 

ASF GitHub Bot commented on FLINK-2720:
---

Github user RalphSu commented on the issue:

https://github.com/apache/flink/pull/1157
  
?


> Add Storm-CountMetric in flink-stormcompatibility
> -
>
> Key: FLINK-2720
> URL: https://issues.apache.org/jira/browse/FLINK-2720
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: Huang Wei
>Assignee: Matthias J. Sax
> Fix For: 1.0.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Add the CountMetric for the first step of storm metrics:
> 1.Do a wrapper FlinkCountMetric for CountMetric
> 2.push the RuntimeContext in FlinkTopologyContext to use `addAccumulator` 
> method for registering the metric.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #1157: [FLINK-2720][storm-compatibility]Add Storm-CountMetric fo...

2017-03-05 Thread RalphSu
Github user RalphSu commented on the issue:

https://github.com/apache/flink/pull/1157
  
Here it looks user need to change code to make metrics work


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2720) Add Storm-CountMetric in flink-stormcompatibility

2017-03-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15896238#comment-15896238
 ] 

ASF GitHub Bot commented on FLINK-2720:
---

Github user RalphSu commented on the issue:

https://github.com/apache/flink/pull/1157
  
Here it looks user need to change code to make metrics work


> Add Storm-CountMetric in flink-stormcompatibility
> -
>
> Key: FLINK-2720
> URL: https://issues.apache.org/jira/browse/FLINK-2720
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: Huang Wei
>Assignee: Matthias J. Sax
> Fix For: 1.0.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Add the CountMetric for the first step of storm metrics:
> 1.Do a wrapper FlinkCountMetric for CountMetric
> 2.push the RuntimeContext in FlinkTopologyContext to use `addAccumulator` 
> method for registering the metric.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-2720) Add Storm-CountMetric in flink-stormcompatibility

2017-03-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15896237#comment-15896237
 ] 

ASF GitHub Bot commented on FLINK-2720:
---

Github user RalphSu commented on the issue:

https://github.com/apache/flink/pull/1157
  
Here it looks user need to change code to make metrics work


> Add Storm-CountMetric in flink-stormcompatibility
> -
>
> Key: FLINK-2720
> URL: https://issues.apache.org/jira/browse/FLINK-2720
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: Huang Wei
>Assignee: Matthias J. Sax
> Fix For: 1.0.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Add the CountMetric for the first step of storm metrics:
> 1.Do a wrapper FlinkCountMetric for CountMetric
> 2.push the RuntimeContext in FlinkTopologyContext to use `addAccumulator` 
> method for registering the metric.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #1157: [FLINK-2720][storm-compatibility]Add Storm-CountMetric fo...

2017-03-05 Thread RalphSu
Github user RalphSu commented on the issue:

https://github.com/apache/flink/pull/1157
  
Here it looks user need to change code to make metrics work


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #1157: [FLINK-2720][storm-compatibility]Add Storm-CountMetric fo...

2017-03-05 Thread RalphSu
Github user RalphSu commented on the issue:

https://github.com/apache/flink/pull/1157
  
Any progress on this one? Looks this has been stop for a while.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2720) Add Storm-CountMetric in flink-stormcompatibility

2017-03-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15896234#comment-15896234
 ] 

ASF GitHub Bot commented on FLINK-2720:
---

Github user RalphSu commented on the issue:

https://github.com/apache/flink/pull/1157
  
Any progress on this one? Looks this has been stop for a while.


> Add Storm-CountMetric in flink-stormcompatibility
> -
>
> Key: FLINK-2720
> URL: https://issues.apache.org/jira/browse/FLINK-2720
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: Huang Wei
>Assignee: Matthias J. Sax
> Fix For: 1.0.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Add the CountMetric for the first step of storm metrics:
> 1.Do a wrapper FlinkCountMetric for CountMetric
> 2.push the RuntimeContext in FlinkTopologyContext to use `addAccumulator` 
> method for registering the metric.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-2720) Add Storm-CountMetric in flink-stormcompatibility

2017-03-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15896231#comment-15896231
 ] 

ASF GitHub Bot commented on FLINK-2720:
---

Github user RalphSu commented on a diff in the pull request:

https://github.com/apache/flink/pull/1157#discussion_r104311095
  
--- Diff: 
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContext.java
 ---
@@ -120,7 +129,19 @@ public ReducedMetric registerMetric(final String name, 
final IReducer combiner,
@SuppressWarnings("unchecked")
@Override
public IMetric registerMetric(final String name, final IMetric metric, 
final int timeBucketSizeInSecs) {
-   throw new UnsupportedOperationException("Metrics are not 
supported by Flink");
+
+   // TODO: There is no use for timeBucketSizeInSecs yet.
+   if (metric instanceof FlinkCountMetric) {
--- End diff --

Here it looks user need to change code to make metrics work


> Add Storm-CountMetric in flink-stormcompatibility
> -
>
> Key: FLINK-2720
> URL: https://issues.apache.org/jira/browse/FLINK-2720
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: Huang Wei
>Assignee: Matthias J. Sax
> Fix For: 1.0.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Add the CountMetric for the first step of storm metrics:
> 1.Do a wrapper FlinkCountMetric for CountMetric
> 2.push the RuntimeContext in FlinkTopologyContext to use `addAccumulator` 
> method for registering the metric.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #1157: [FLINK-2720][storm-compatibility]Add Storm-CountMe...

2017-03-05 Thread RalphSu
Github user RalphSu commented on a diff in the pull request:

https://github.com/apache/flink/pull/1157#discussion_r104311095
  
--- Diff: 
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContext.java
 ---
@@ -120,7 +129,19 @@ public ReducedMetric registerMetric(final String name, 
final IReducer combiner,
@SuppressWarnings("unchecked")
@Override
public IMetric registerMetric(final String name, final IMetric metric, 
final int timeBucketSizeInSecs) {
-   throw new UnsupportedOperationException("Metrics are not 
supported by Flink");
+
+   // TODO: There is no use for timeBucketSizeInSecs yet.
+   if (metric instanceof FlinkCountMetric) {
--- End diff --

Here it looks user need to change code to make metrics work


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1157: [FLINK-2720][storm-compatibility]Add Storm-CountMe...

2017-03-05 Thread RalphSu
Github user RalphSu commented on a diff in the pull request:

https://github.com/apache/flink/pull/1157#discussion_r104311065
  
--- Diff: 
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContext.java
 ---
@@ -120,7 +129,19 @@ public ReducedMetric registerMetric(final String name, 
final IReducer combiner,
@SuppressWarnings("unchecked")
@Override
public IMetric registerMetric(final String name, final IMetric metric, 
final int timeBucketSizeInSecs) {
-   throw new UnsupportedOperationException("Metrics are not 
supported by Flink");
+
+   // TODO: There is no use for timeBucketSizeInSecs yet.
+   if (metric instanceof FlinkCountMetric) {
--- End diff --

Here it looks user need to change code to make metrics work


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2720) Add Storm-CountMetric in flink-stormcompatibility

2017-03-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15896228#comment-15896228
 ] 

ASF GitHub Bot commented on FLINK-2720:
---

Github user RalphSu commented on a diff in the pull request:

https://github.com/apache/flink/pull/1157#discussion_r104311065
  
--- Diff: 
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContext.java
 ---
@@ -120,7 +129,19 @@ public ReducedMetric registerMetric(final String name, 
final IReducer combiner,
@SuppressWarnings("unchecked")
@Override
public IMetric registerMetric(final String name, final IMetric metric, 
final int timeBucketSizeInSecs) {
-   throw new UnsupportedOperationException("Metrics are not 
supported by Flink");
+
+   // TODO: There is no use for timeBucketSizeInSecs yet.
+   if (metric instanceof FlinkCountMetric) {
--- End diff --

Here it looks user need to change code to make metrics work


> Add Storm-CountMetric in flink-stormcompatibility
> -
>
> Key: FLINK-2720
> URL: https://issues.apache.org/jira/browse/FLINK-2720
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: Huang Wei
>Assignee: Matthias J. Sax
> Fix For: 1.0.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Add the CountMetric for the first step of storm metrics:
> 1.Do a wrapper FlinkCountMetric for CountMetric
> 2.push the RuntimeContext in FlinkTopologyContext to use `addAccumulator` 
> method for registering the metric.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4816) Executions failed from "DEPLOYING" should retain restored checkpoint information

2017-03-05 Thread Wei-Che Wei (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15896187#comment-15896187
 ] 

Wei-Che Wei commented on FLINK-4816:


Hi [~ram_krish]

If you have been working on it, just keep going. I will go to find another 
task. Thank you.

> Executions failed from "DEPLOYING" should retain restored checkpoint 
> information
> 
>
> Key: FLINK-4816
> URL: https://issues.apache.org/jira/browse/FLINK-4816
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Stephan Ewen
>
> When an execution fails from state {{DEPLOYING}}, it should wrap the failure 
> to better report the failure cause:
>   - If no checkpoint was restored, it should wrap the exception in a 
> {{DeployTaskException}}
>   - If a checkpoint was restored, it should wrap the exception in a 
> {{RestoreTaskException}} and record the id of the checkpoint that was 
> attempted to be restored.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)