[jira] [Updated] (KAFKA-3960) Committed offset not set after first assign

2016-07-16 Thread Alexey Romanchuk (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3960?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Romanchuk updated KAFKA-3960:

Description: 
Committed offset did not set after first assign. Here it is minimal example 
(scala):

{code}
  val props = new Properties()
  props.put("bootstrap.servers", "localhost:9092")
  props.put("client.id", "client1")
  props.put("group.id", "client1")
  props.put("enable.auto.commit", "false")
  props.put("key.deserializer", 
"org.apache.kafka.common.serialization.ByteArrayDeserializer")
  props.put("value.deserializer", 
"org.apache.kafka.common.serialization.ByteArrayDeserializer")

  val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](props)

  import scala.collection.JavaConversions._
  def dumpPositionAndCommitted() = {
consumer.assignment().foreach { tp =>
  println(tp)
  println(s"Position - ${consumer.position(tp)}")
  println(s"Committed - ${consumer.committed(tp)}")
}
println("---")
  }

  consumer.assign(Collections.singleton(new TopicPartition("topic", 0)))
  dumpPositionAndCommitted()
  Thread.sleep(3000)
  val ps = Collections.singleton(new TopicPartition("topic", 1)) ++ 
consumer.assignment()
  consumer.assign(ps)
  dumpPositionAndCommitted()
  Thread.sleep(3000)
  dumpPositionAndCommitted()
{code}

and the result is

{noformat}
Position - 1211046445
Committed - OffsetAndMetadata{offset=1211046445, metadata=''}
---
topic-1
Position - 1262864347
Committed - null
topic-0
Position - 1211046445
Committed - OffsetAndMetadata{offset=1211046445, metadata=''}
---
topic-1
Position - 1262864347
Committed - null
topic-0
Position - 1211046445
Committed - OffsetAndMetadata{offset=1211046445, metadata=''}
---
{noformat}

Pay attention to 
{noformat}
topic-1
Position - 1262864347
Committed - null
{noformat}

There is no committed offset fetched from broker, but it is. Looks like we 
should set {{needsFetchCommittedOffsets}} to {{true}} during assign the 
partition

  was:
Committed offset did not set after first assign. Here it is minimal example 
(scala):

{code}
  val props = new Properties()
  props.put("bootstrap.servers", "localhost:9092")
  props.put("client.id", "client1")
  props.put("group.id", "client1")
  props.put("enable.auto.commit", "false")
  props.put("key.deserializer", 
"org.apache.kafka.common.serialization.ByteArrayDeserializer")
  props.put("value.deserializer", 
"org.apache.kafka.common.serialization.ByteArrayDeserializer")

  val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](props)

  import scala.collection.JavaConversions._
  def dumpPositionAndCommitted() = {
consumer.assignment().foreach { tp =>
  println(tp)
  println(s"Position - ${consumer.position(tp)}")
  println(s"Committed - ${consumer.committed(tp)}")
}
println("---")
  }

  consumer.assign(Collections.singleton(new TopicPartition("topic", 0)))
  dumpPositionAndCommitted()
  Thread.sleep(3000)
  val ps = Collections.singleton(new TopicPartition("topic", 1)) ++ 
consumer.assignment()
  consumer.assign(ps)
  dumpPositionAndCommitted()
  Thread.sleep(3000)
  dumpPositionAndCommitted()
{code}

and the result is

{noformat}
Position - 1211046445
Committed - OffsetAndMetadata{offset=1211046445, metadata=''}
---
proto7_fraud-1
Position - 1262864347
Committed - null
topic-0
Position - 1211046445
Committed - OffsetAndMetadata{offset=1211046445, metadata=''}
---
topic-1
Position - 1262864347
Committed - null
topic-0
Position - 1211046445
Committed - OffsetAndMetadata{offset=1211046445, metadata=''}
---
{noformat}

Pay attention to 
{noformat}
topic-1
Position - 1262864347
Committed - null
{noformat}

There is no committed offset fetched from broker, but it is. Looks like we 
should set {{needsFetchCommittedOffsets}} to {{true}} during assign the 
partition


> Committed offset not set after first assign
> ---
>
> Key: KAFKA-3960
> URL: https://issues.apache.org/jira/browse/KAFKA-3960
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.0.0
>Reporter: Alexey Romanchuk
>Priority: Blocker
>
> Committed offset did not set after first assign. Here it is minimal example 
> (scala):
> {code}
>   val props = new Properties()
>   props.put("bootstrap.servers", "localhost:9092")
>   props.put("client.id", "client1")
>   props.put("group.id", "client1")
>   props.put("enable.auto.commit", "false")
>   props.put("key.deserializer", 
> "org.apache.kafka.common.serialization.ByteArrayDeserializer")
>   props.put("value.deserializer", 
> "org.apache.kafka.common.serialization.ByteArrayDeserializer")
>   val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](props)
>   import scala.collection.JavaConversions._
>   def dumpPositionAndCommitted() = {
> 

[jira] [Commented] (KAFKA-3960) Committed offset not set after first assign

2016-07-14 Thread Alexey Romanchuk (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15378874#comment-15378874
 ] 

Alexey Romanchuk commented on KAFKA-3960:
-

Anyone?

> Committed offset not set after first assign
> ---
>
> Key: KAFKA-3960
> URL: https://issues.apache.org/jira/browse/KAFKA-3960
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.0.0
>Reporter: Alexey Romanchuk
>Priority: Blocker
>
> Committed offset did not set after first assign. Here it is minimal example 
> (scala):
> {code}
>   val props = new Properties()
>   props.put("bootstrap.servers", "localhost:9092")
>   props.put("client.id", "client1")
>   props.put("group.id", "client1")
>   props.put("enable.auto.commit", "false")
>   props.put("key.deserializer", 
> "org.apache.kafka.common.serialization.ByteArrayDeserializer")
>   props.put("value.deserializer", 
> "org.apache.kafka.common.serialization.ByteArrayDeserializer")
>   val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](props)
>   import scala.collection.JavaConversions._
>   def dumpPositionAndCommitted() = {
> consumer.assignment().foreach { tp =>
>   println(tp)
>   println(s"Position - ${consumer.position(tp)}")
>   println(s"Committed - ${consumer.committed(tp)}")
> }
> println("---")
>   }
>   consumer.assign(Collections.singleton(new TopicPartition("topic", 0)))
>   dumpPositionAndCommitted()
>   Thread.sleep(3000)
>   val ps = Collections.singleton(new TopicPartition("topic", 1)) ++ 
> consumer.assignment()
>   consumer.assign(ps)
>   dumpPositionAndCommitted()
>   Thread.sleep(3000)
>   dumpPositionAndCommitted()
> {code}
> and the result is
> {noformat}
> Position - 1211046445
> Committed - OffsetAndMetadata{offset=1211046445, metadata=''}
> ---
> proto7_fraud-1
> Position - 1262864347
> Committed - null
> topic-0
> Position - 1211046445
> Committed - OffsetAndMetadata{offset=1211046445, metadata=''}
> ---
> topic-1
> Position - 1262864347
> Committed - null
> topic-0
> Position - 1211046445
> Committed - OffsetAndMetadata{offset=1211046445, metadata=''}
> ---
> {noformat}
> Pay attention to 
> {noformat}
> topic-1
> Position - 1262864347
> Committed - null
> {noformat}
> There is no committed offset fetched from broker, but it is. Looks like we 
> should set {{needsFetchCommittedOffsets}} to {{true}} during assign the 
> partition



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3960) Committed offset not set after first assign

2016-07-13 Thread Alexey Romanchuk (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15374813#comment-15374813
 ] 

Alexey Romanchuk commented on KAFKA-3960:
-

I can not find any workaround for this bug. If you know it, please share it in 
comments

> Committed offset not set after first assign
> ---
>
> Key: KAFKA-3960
> URL: https://issues.apache.org/jira/browse/KAFKA-3960
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.0.0
>Reporter: Alexey Romanchuk
>Priority: Blocker
>
> Committed offset did not set after first assign. Here it is minimal example 
> (scala):
> {code}
>   val props = new Properties()
>   props.put("bootstrap.servers", "localhost:9092")
>   props.put("client.id", "client1")
>   props.put("group.id", "client1")
>   props.put("enable.auto.commit", "false")
>   props.put("key.deserializer", 
> "org.apache.kafka.common.serialization.ByteArrayDeserializer")
>   props.put("value.deserializer", 
> "org.apache.kafka.common.serialization.ByteArrayDeserializer")
>   val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](props)
>   import scala.collection.JavaConversions._
>   def dumpPositionAndCommitted() = {
> consumer.assignment().foreach { tp =>
>   println(tp)
>   println(s"Position - ${consumer.position(tp)}")
>   println(s"Committed - ${consumer.committed(tp)}")
> }
> println("---")
>   }
>   consumer.assign(Collections.singleton(new TopicPartition("topic", 0)))
>   dumpPositionAndCommitted()
>   Thread.sleep(3000)
>   val ps = Collections.singleton(new TopicPartition("topic", 1)) ++ 
> consumer.assignment()
>   consumer.assign(ps)
>   dumpPositionAndCommitted()
>   Thread.sleep(3000)
>   dumpPositionAndCommitted()
> {code}
> and the result is
> {noformat}
> Position - 1211046445
> Committed - OffsetAndMetadata{offset=1211046445, metadata=''}
> ---
> proto7_fraud-1
> Position - 1262864347
> Committed - null
> topic-0
> Position - 1211046445
> Committed - OffsetAndMetadata{offset=1211046445, metadata=''}
> ---
> topic-1
> Position - 1262864347
> Committed - null
> topic-0
> Position - 1211046445
> Committed - OffsetAndMetadata{offset=1211046445, metadata=''}
> ---
> {noformat}
> Pay attention to 
> {noformat}
> topic-1
> Position - 1262864347
> Committed - null
> {noformat}
> There is no committed offset fetched from broker, but it is. Looks like we 
> should set {{needsFetchCommittedOffsets}} to {{true}} during assign the 
> partition



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3960) Committed offset not set after first assign

2016-07-13 Thread Alexey Romanchuk (JIRA)
Alexey Romanchuk created KAFKA-3960:
---

 Summary: Committed offset not set after first assign
 Key: KAFKA-3960
 URL: https://issues.apache.org/jira/browse/KAFKA-3960
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 0.10.0.0
Reporter: Alexey Romanchuk
Priority: Blocker


Committed offset did not set after first assign. Here it is minimal example 
(scala):

{code}
  val props = new Properties()
  props.put("bootstrap.servers", "localhost:9092")
  props.put("client.id", "client1")
  props.put("group.id", "client1")
  props.put("enable.auto.commit", "false")
  props.put("key.deserializer", 
"org.apache.kafka.common.serialization.ByteArrayDeserializer")
  props.put("value.deserializer", 
"org.apache.kafka.common.serialization.ByteArrayDeserializer")

  val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](props)

  import scala.collection.JavaConversions._
  def dumpPositionAndCommitted() = {
consumer.assignment().foreach { tp =>
  println(tp)
  println(s"Position - ${consumer.position(tp)}")
  println(s"Committed - ${consumer.committed(tp)}")
}
println("---")
  }

  consumer.assign(Collections.singleton(new TopicPartition("topic", 0)))
  dumpPositionAndCommitted()
  Thread.sleep(3000)
  val ps = Collections.singleton(new TopicPartition("topic", 1)) ++ 
consumer.assignment()
  consumer.assign(ps)
  dumpPositionAndCommitted()
  Thread.sleep(3000)
  dumpPositionAndCommitted()
{code}

and the result is

{noformat}
Position - 1211046445
Committed - OffsetAndMetadata{offset=1211046445, metadata=''}
---
proto7_fraud-1
Position - 1262864347
Committed - null
topic-0
Position - 1211046445
Committed - OffsetAndMetadata{offset=1211046445, metadata=''}
---
topic-1
Position - 1262864347
Committed - null
topic-0
Position - 1211046445
Committed - OffsetAndMetadata{offset=1211046445, metadata=''}
---
{noformat}

Pay attention to 
{noformat}
topic-1
Position - 1262864347
Committed - null
{noformat}

There is no committed offset fetched from broker, but it is. Looks like we 
should set {{needsFetchCommittedOffsets}} to {{true}} during assign the 
partition



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)