[jira] [Commented] (FLINK-4618) FlinkKafkaConsumer09 should start from the next record on startup from offsets in Kafka

2016-09-15 Thread Matthew Barlocker (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15493956#comment-15493956
 ] 

Matthew Barlocker commented on FLINK-4618:
--

Nice find [~melmoth] and [~tzulitai] - I don't know the code well enough to 
feel confident making the change or testing the fix. Please feel free. I 
started learning Flink about a week ago, and have put a solid 3 hours into it. 

> FlinkKafkaConsumer09 should start from the next record on startup from 
> offsets in Kafka
> ---
>
> Key: FLINK-4618
> URL: https://issues.apache.org/jira/browse/FLINK-4618
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.2
> Environment: Flink 1.1.2
> Kafka Broker 0.10.0
> Hadoop 2.7.0
>Reporter: Melmoth
> Fix For: 1.2.0, 1.1.3
>
>
> **Original reported ticket title: Last kafka message gets consumed twice when 
> restarting job**
> There seem to be an issue with the offset management in Flink. When a job is 
> stopped and startet again, a message from the previous offset is read again.
> I enabled checkpoints (EXACTLY_ONCE) and FsStateBackend. I started with a new 
> consumer group and emitted one record.
> You can cleary see, that the consumer waits for a new record at offset 
> 4848911, which is correct. After restarting, it consumes a record at 4848910, 
> causing the record to be consumed more than once.
> I checked the offset with the Kafka CMD tools, the commited offset in 
> zookeeper is 4848910.
> Here is my log output:
> {code}
> 10:29:24,225 DEBUG org.apache.kafka.clients.NetworkClient 
>- Initiating connection to node 2147482646 at hdp1:6667.
> 10:29:24,225 DEBUG 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Fetching 
> committed offsets for partitions: [myTopic-0]
> 10:29:24,228 DEBUG org.apache.kafka.clients.NetworkClient 
>- Completed connection to node 2147482646
> 10:29:24,234 DEBUG 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - No 
> committed offset for partition myTopic-0
> 10:29:24,238 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher
>- Resetting offset for partition myTopic-0 to latest offset.
> 10:29:24,244 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher
>- Fetched offset 4848910 for partition myTopic-0
> 10:29:24,245 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848910
> 10:29:24,773 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848910
> 10:29:25,276 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848910
> -- Inserting a new event here
> 10:30:22,447 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Adding fetched record for partition myTopic-0 with offset 4848910 to 
> buffered record list
> 10:30:22,448 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Returning fetched records at offset 4848910 for assigned partition 
> myTopic-0 and update position to 4848911
> 10:30:22,451 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:22,953 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:23,456 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:23,887 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>- Triggering checkpoint 6 @ 1473841823887
> 10:30:23,957 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:23,996 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>- Completed checkpoint 6 (in 96 ms)
> 10:30:24,196 TRACE 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Sending 
> offset-commit request with {myTopic-0=OffsetAndMetadata{offset=4848910, 
> metadata=''}} to Node(2147482646, hdp1, 6667)
> 10:30:24,204 DEBUG 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Committed 
> offset 4848910 for partition myTopic-0
> 10:30:24,460 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:24,963 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:48,057 INFO  

[jira] [Commented] (FLINK-4617) Kafka & Flink duplicate messages on restart

2016-09-15 Thread Matthew Barlocker (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15493667#comment-15493667
 ] 

Matthew Barlocker commented on FLINK-4617:
--

Sounds good. Thanks!

> Kafka & Flink duplicate messages on restart
> ---
>
> Key: FLINK-4617
> URL: https://issues.apache.org/jira/browse/FLINK-4617
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.1.0, 1.0.1, 1.0.2, 1.0.3, 1.1.1, 1.1.2
> Environment: Ubuntu 16.04
> Flink 1.1.*
> Kafka 0.9.0.1
> Scala 2.11.7
> Java 1.8.0_91
>Reporter: Matthew Barlocker
>Priority: Critical
>
> [StackOverflow 
> Link|http://stackoverflow.com/questions/39459315/kafka-flink-duplicate-messages-on-restart]
> Flink (the kafka connector) re-runs the last 3-9 messages it saw before it 
> was shut down.
> *My code:*
> {code}
> import java.util.Properties
> import org.apache.flink.streaming.api.windowing.time.Time
> import org.apache.flink.streaming.api.scala._
> import org.apache.flink.streaming.api.CheckpointingMode
> import org.apache.flink.streaming.connectors.kafka._
> import org.apache.flink.streaming.util.serialization._
> import org.apache.flink.runtime.state.filesystem._
> object Runner {
>   def main(args: Array[String]): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.enableCheckpointing(500)
> env.setStateBackend(new FsStateBackend("file:///tmp/checkpoints"))
> 
> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
> val properties = new Properties()
> properties.setProperty("bootstrap.servers", "localhost:9092");
> properties.setProperty("group.id", "testing");
> val kafkaConsumer = new FlinkKafkaConsumer09[String]("testing-in", new 
> SimpleStringSchema(), properties)
> val kafkaProducer = new FlinkKafkaProducer09[String]("localhost:9092", 
> "testing-out", new SimpleStringSchema())
> env.addSource(kafkaConsumer)
>   .addSink(kafkaProducer)
> env.execute()
>   }
> }
> {code}
> *My sbt dependencies:*
> {code}
> libraryDependencies ++= Seq(
> "org.apache.flink" %% "flink-scala" % "1.1.2",
> "org.apache.flink" %% "flink-streaming-scala" % "1.1.2",
> "org.apache.flink" %% "flink-clients" % "1.1.2",
> "org.apache.flink" %% "flink-connector-kafka-0.9" % "1.1.2",
> "org.apache.flink" %% "flink-connector-filesystem" % "1.1.2"
> )
> {code}
> *My process:*
> using 3 terminals:
> {code}
> TERM-1 start sbt, run program
> TERM-2 create kafka topics testing-in and testing-out
> TERM-2 run kafka-console-producer on testing-in topic
> TERM-3 run kafka-console-consumer on testing-out topic
> TERM-2 send data to kafka producer.
> Wait for a couple seconds (buffers need to flush)
> TERM-3 watch data appear in testing-out topic
> Wait for at least 500 milliseconds for checkpointing to happen
> TERM-1 stop sbt
> TERM-1 run sbt
> TERM-3 watch last few lines of data appear in testing-out topic
> {code}
> *My expectations:*
> When there are no errors in the system, I expect to be able to turn flink on 
> and off without reprocessing messages that successfully completed the stream 
> in a prior run.
> *My attempts to fix:*
> I've added the call to setStateBackend, thinking that perhaps the default 
> memory backend just didn't remember correctly. That didn't seem to help.
> I've removed the call to enableCheckpointing, hoping that perhaps there was a 
> separate mechanism to track state in Flink vs Zookeeper. That didn't seem to 
> help.
> I've used different sinks, RollingFileSink, print(); hoping that maybe the 
> bug was in kafka. That didn't seem to help.
> I've rolled back to flink (and all connectors) v1.1.0 and v1.1.1, hoping that 
> maybe the bug was in the latest version. That didn't seem to help.
> I've added the zookeeper.connect config to the properties object, hoping that 
> the comment about it only being useful in 0.8 was wrong. That didn't seem to 
> help.
> I've explicitly set the checkpointing mode to EXACTLY_ONCE (good idea 
> drfloob). That didn't seem to help.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4617) Kafka & Flink duplicate messages on restart

2016-09-13 Thread Matthew Barlocker (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthew Barlocker updated FLINK-4617:
-
Affects Version/s: 1.0.1
   1.0.2
   1.0.3

> Kafka & Flink duplicate messages on restart
> ---
>
> Key: FLINK-4617
> URL: https://issues.apache.org/jira/browse/FLINK-4617
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.1.0, 1.0.1, 1.0.2, 1.0.3, 1.1.1, 1.1.2
> Environment: Ubuntu 16.04
> Flink 1.1.*
> Kafka 0.9.0.1
> Scala 2.11.7
> Java 1.8.0_91
>Reporter: Matthew Barlocker
>Priority: Critical
>
> [StackOverflow 
> Link|http://stackoverflow.com/questions/39459315/kafka-flink-duplicate-messages-on-restart]
> Flink (the kafka connector) re-runs the last 3-9 messages it saw before it 
> was shut down.
> *My code:*
> {code}
> import java.util.Properties
> import org.apache.flink.streaming.api.windowing.time.Time
> import org.apache.flink.streaming.api.scala._
> import org.apache.flink.streaming.api.CheckpointingMode
> import org.apache.flink.streaming.connectors.kafka._
> import org.apache.flink.streaming.util.serialization._
> import org.apache.flink.runtime.state.filesystem._
> object Runner {
>   def main(args: Array[String]): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.enableCheckpointing(500)
> env.setStateBackend(new FsStateBackend("file:///tmp/checkpoints"))
> 
> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
> val properties = new Properties()
> properties.setProperty("bootstrap.servers", "localhost:9092");
> properties.setProperty("group.id", "testing");
> val kafkaConsumer = new FlinkKafkaConsumer09[String]("testing-in", new 
> SimpleStringSchema(), properties)
> val kafkaProducer = new FlinkKafkaProducer09[String]("localhost:9092", 
> "testing-out", new SimpleStringSchema())
> env.addSource(kafkaConsumer)
>   .addSink(kafkaProducer)
> env.execute()
>   }
> }
> {code}
> *My sbt dependencies:*
> {code}
> libraryDependencies ++= Seq(
> "org.apache.flink" %% "flink-scala" % "1.1.2",
> "org.apache.flink" %% "flink-streaming-scala" % "1.1.2",
> "org.apache.flink" %% "flink-clients" % "1.1.2",
> "org.apache.flink" %% "flink-connector-kafka-0.9" % "1.1.2",
> "org.apache.flink" %% "flink-connector-filesystem" % "1.1.2"
> )
> {code}
> *My process:*
> using 3 terminals:
> {code}
> TERM-1 start sbt, run program
> TERM-2 create kafka topics testing-in and testing-out
> TERM-2 run kafka-console-producer on testing-in topic
> TERM-3 run kafka-console-consumer on testing-out topic
> TERM-2 send data to kafka producer.
> Wait for a couple seconds (buffers need to flush)
> TERM-3 watch data appear in testing-out topic
> Wait for at least 500 milliseconds for checkpointing to happen
> TERM-1 stop sbt
> TERM-1 run sbt
> TERM-3 watch last few lines of data appear in testing-out topic
> {code}
> *My expectations:*
> When there are no errors in the system, I expect to be able to turn flink on 
> and off without reprocessing messages that successfully completed the stream 
> in a prior run.
> *My attempts to fix:*
> I've added the call to setStateBackend, thinking that perhaps the default 
> memory backend just didn't remember correctly. That didn't seem to help.
> I've removed the call to enableCheckpointing, hoping that perhaps there was a 
> separate mechanism to track state in Flink vs Zookeeper. That didn't seem to 
> help.
> I've used different sinks, RollingFileSink, print(); hoping that maybe the 
> bug was in kafka. That didn't seem to help.
> I've rolled back to flink (and all connectors) v1.1.0 and v1.1.1, hoping that 
> maybe the bug was in the latest version. That didn't seem to help.
> I've added the zookeeper.connect config to the properties object, hoping that 
> the comment about it only being useful in 0.8 was wrong. That didn't seem to 
> help.
> I've explicitly set the checkpointing mode to EXACTLY_ONCE (good idea 
> drfloob). That didn't seem to help.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4617) Kafka & Flink duplicate messages on restart

2016-09-13 Thread Matthew Barlocker (JIRA)
Matthew Barlocker created FLINK-4617:


 Summary: Kafka & Flink duplicate messages on restart
 Key: FLINK-4617
 URL: https://issues.apache.org/jira/browse/FLINK-4617
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector, State Backends, Checkpointing
Affects Versions: 1.1.2, 1.1.1, 1.1.0
 Environment: Ubuntu 16.04
Flink 1.1.*
Kafka 0.9.0.1
Scala 2.11.7
Java 1.8.0_91
Reporter: Matthew Barlocker
Priority: Critical


[StackOverflow 
Link|http://stackoverflow.com/questions/39459315/kafka-flink-duplicate-messages-on-restart]

Flink (the kafka connector) re-runs the last 3-9 messages it saw before it was 
shut down.

*My code:*
{code}
import java.util.Properties
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.connectors.kafka._
import org.apache.flink.streaming.util.serialization._
import org.apache.flink.runtime.state.filesystem._

object Runner {
  def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(500)
env.setStateBackend(new FsStateBackend("file:///tmp/checkpoints"))
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "testing");

val kafkaConsumer = new FlinkKafkaConsumer09[String]("testing-in", new 
SimpleStringSchema(), properties)
val kafkaProducer = new FlinkKafkaProducer09[String]("localhost:9092", 
"testing-out", new SimpleStringSchema())
env.addSource(kafkaConsumer)
  .addSink(kafkaProducer)

env.execute()
  }
}
{code}

*My sbt dependencies:*
{code}
libraryDependencies ++= Seq(
"org.apache.flink" %% "flink-scala" % "1.1.2",
"org.apache.flink" %% "flink-streaming-scala" % "1.1.2",
"org.apache.flink" %% "flink-clients" % "1.1.2",
"org.apache.flink" %% "flink-connector-kafka-0.9" % "1.1.2",
"org.apache.flink" %% "flink-connector-filesystem" % "1.1.2"
)
{code}

*My process:*
using 3 terminals:
{code}
TERM-1 start sbt, run program
TERM-2 create kafka topics testing-in and testing-out
TERM-2 run kafka-console-producer on testing-in topic
TERM-3 run kafka-console-consumer on testing-out topic
TERM-2 send data to kafka producer.
Wait for a couple seconds (buffers need to flush)
TERM-3 watch data appear in testing-out topic
Wait for at least 500 milliseconds for checkpointing to happen
TERM-1 stop sbt
TERM-1 run sbt
TERM-3 watch last few lines of data appear in testing-out topic
{code}

*My expectations:*

When there are no errors in the system, I expect to be able to turn flink on 
and off without reprocessing messages that successfully completed the stream in 
a prior run.

*My attempts to fix:*

I've added the call to setStateBackend, thinking that perhaps the default 
memory backend just didn't remember correctly. That didn't seem to help.

I've removed the call to enableCheckpointing, hoping that perhaps there was a 
separate mechanism to track state in Flink vs Zookeeper. That didn't seem to 
help.

I've used different sinks, RollingFileSink, print(); hoping that maybe the bug 
was in kafka. That didn't seem to help.

I've rolled back to flink (and all connectors) v1.1.0 and v1.1.1, hoping that 
maybe the bug was in the latest version. That didn't seem to help.

I've added the zookeeper.connect config to the properties object, hoping that 
the comment about it only being useful in 0.8 was wrong. That didn't seem to 
help.

I've explicitly set the checkpointing mode to EXACTLY_ONCE (good idea drfloob). 
That didn't seem to help.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)