[jira] [Closed] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-10-01 Thread Raghu Angadi (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raghu Angadi closed BEAM-4038.
--
   Resolution: Fixed
Fix Version/s: Not applicable

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Geet Kumar
>Priority: Major
> Fix For: Not applicable
>
>  Time Spent: 10h 10m
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-10-01 Thread Raghu Angadi (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-4038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16634641#comment-16634641
 ] 

Raghu Angadi commented on BEAM-4038:


Sounds good. I will close this for now. We can add write support if there is 
demand for it.

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Geet Kumar
>Priority: Major
>  Time Spent: 10h 10m
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (BEAM-5375) KafkaIO reader should handle runtime exceptions kafka client

2018-09-18 Thread Raghu Angadi (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raghu Angadi updated BEAM-5375:
---
Description: 
KafkaIO reader might stop reading from Kafka without any explicit error message 
if KafkaConsumer throws a runtime exception while polling for messages. One of 
the Dataflow customers encountered this issue (see [user@ 
thread|https://lists.apache.org/thread.html/c0cf8f45f567a0623592e2d8340f5288e3e774b59bca985aec410a81@%3Cuser.beam.apache.org%3E])]

'consumerPollThread()' in KafkaIO deliberately avoided catching runtime 
exceptions. It shoud handle it.. stuff happens at runtime. 

It should result in 'IOException' from start()/advance(). The runners will 
handle properly reporting and closing the readers. 

  was:
KafkaIO reader might stop reading from Kafka without any explicit error message 
if KafkaConsumer throws a runtime exception while polling for messages. One of 
the Dataflow customers encountered this issue (see [user@ 
thread|[https://lists.apache.org/thread.html/c0cf8f45f567a0623592e2d8340f5288e3e774b59bca985aec410a81@%3Cuser.beam.apache.org%3E])]

'consumerPollThread()' in KafkaIO deliberately avoided catching runtime 
exceptions. It shoud handle it.. stuff happens at runtime. 

It should result in 'IOException' from start()/advance(). The runners will 
handle properly reporting and closing the readers. 


> KafkaIO reader should handle runtime exceptions kafka client
> 
>
> Key: BEAM-5375
> URL: https://issues.apache.org/jira/browse/BEAM-5375
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-kafka
>Affects Versions: 2.7.0
>Reporter: Raghu Angadi
>Assignee: Raghu Angadi
>Priority: Major
>  Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> KafkaIO reader might stop reading from Kafka without any explicit error 
> message if KafkaConsumer throws a runtime exception while polling for 
> messages. One of the Dataflow customers encountered this issue (see [user@ 
> thread|https://lists.apache.org/thread.html/c0cf8f45f567a0623592e2d8340f5288e3e774b59bca985aec410a81@%3Cuser.beam.apache.org%3E])]
> 'consumerPollThread()' in KafkaIO deliberately avoided catching runtime 
> exceptions. It shoud handle it.. stuff happens at runtime. 
> It should result in 'IOException' from start()/advance(). The runners will 
> handle properly reporting and closing the readers. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-5408) (Java) Using Compression.GZIP with TFRecordIO

2018-09-17 Thread Raghu Angadi (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-5408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16618351#comment-16618351
 ] 

Raghu Angadi commented on BEAM-5408:


The root cause for this is described in BEAM-5412. Both could be combined into 
one.

> (Java) Using Compression.GZIP with TFRecordIO
> -
>
> Key: BEAM-5408
> URL: https://issues.apache.org/jira/browse/BEAM-5408
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-gcp
>Affects Versions: 2.4.0
>Reporter: haden lee
>Assignee: Chamikara Jayalath
>Priority: Major
>
> In short, `TFRecrdIO.read()` does not seem to work if the entry being read is 
> longer than 8,192 (in terms of byte[] length).  `TFRecordIO.write()` seems to 
> be OK with this though (based on some experiments). Perhaps there is some 
> hard-coded value for this specific length somewhere in the SDK, and I'm 
> wondering if it can be increased or parameterized. 
> [I've posted this on 
> StackOverflow|https://stackoverflow.com/questions/52284639/beam-java-sdk-with-tfrecord-and-compression-gzip],
>  but I was advised to report it here.
> Here are the details:
> We're using Beam Java SDK (and Google Cloud Dataflow to run batch jobs) a 
> lot, and we noticed something weird (possibly a bug?) when we tried to use 
> `TFRecordIO` with `Compression.GZIP`. We were able to come up with some 
> sample code that can reproduce the errors we face.
> To be clear, we are using Beam Java SDK 2.4.
> Suppose we have `PCollection` which can be a PC of proto messages, 
> for instance, in byte[] format.
>  We usually write this to GCS (Google Cloud Storage) using Base64 encoding 
> (newline delimited Strings) or using TFRecordIO (without compression). We 
> have had no issue reading the data from GCS in this manner for a very long 
> time (2.5+ years for the former and ~1.5 years for the latter).
> Recently, we tried `TFRecordIO` with `Compression.GZIP` option, and 
> *sometimes* we get an exception as the data is seen as invalid (while being 
> read). The data itself (the gzip files) is not corrupted, and we've tested 
> various things, and reached the following conclusion.
> When a `byte[]` that is being compressed under `TFRecordIO` is above certain 
> threshold (I'd say when at or above 8192), then 
> `TFRecordIO.read().withCompression(Compression.GZIP)` would not work.
>  Specifically, it will throw the following exception:
>  
> {code:java}
> // code placeholder
> Exception in thread "main" java.lang.IllegalStateException: Invalid data
> at 
> org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:444)
> at org.apache.beam.sdk.io.TFRecordIO$TFRecordCodec.read(TFRecordIO.java:642)
> at 
> org.apache.beam.sdk.io.TFRecordIO$TFRecordSource$TFRecordReader.readNextRecord(TFRecordIO.java:526)
> at 
> org.apache.beam.sdk.io.CompressedSource$CompressedReader.readNextRecord(CompressedSource.java:426)
> at 
> org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.advanceImpl(FileBasedSource.java:473)
> at 
> org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.startImpl(FileBasedSource.java:468)
> at 
> org.apache.beam.sdk.io.OffsetBasedSource$OffsetBasedReader.start(OffsetBasedSource.java:261)
> at 
> org.apache.beam.runners.direct.BoundedReadEvaluatorFactory$BoundedReadEvaluator.processElement(BoundedReadEvaluatorFactory.java:141)
> at 
> org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:161)
> at 
> org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:125)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> {code}
>  
> This can be reproduced easily, so you can refer to the code at the end. You 
> will also see comments about the byte array length (as I tested with various 
> sizes, I concluded that 8192 is the magic number).
> So I'm wondering if this is a bug or known issue – I couldn't find anything 
> close to this on Apache Beam's Issue Tracker [here][1] but if there is 
> another forum/site I need to check, please let me know!
>  If this is indeed a bug, what would be the right channel to report this?
> —
>  The following code can reproduce the error we have.
> A successful run (with parameters 1, 39, 100) would show the following 
> message at the end:
> {code:java}
> // code placeholder
>  counter metrics from CountDoFn
> [counter] plain_base64_proto_array_len: 8126
> [counter] plain_base64_proto_in: 1
> [counter] plain_base64_proto_val_cnt: 

[jira] [Assigned] (BEAM-5412) TFRecordIO fails with records larger than 8K

2018-09-17 Thread Raghu Angadi (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raghu Angadi reassigned BEAM-5412:
--

Assignee: Chamikara Jayalath  (was: Eugene Kirpichov)

> TFRecordIO fails with records larger than 8K
> 
>
> Key: BEAM-5412
> URL: https://issues.apache.org/jira/browse/BEAM-5412
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-text
>Affects Versions: 2.4.0
>Reporter: Raghu Angadi
>Assignee: Chamikara Jayalath
>Priority: Major
>
> This was reported on 
> [Stackoverflow|https://stackoverflow.com/questions/52284639/beam-java-sdk-with-tfrecord-and-compression-gzip].
>  TFRecordIO reader assumes a single call to {{channel.read()}} returns as 
> much as can fit in the input buffer. {{read()}} can return fewer bytes than 
> requested. Assert failure : 
> https://github.com/apache/beam/blob/release-2.4.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java#L642



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-5412) TFRecordIO fails with records larger than 8K

2018-09-17 Thread Raghu Angadi (JIRA)
Raghu Angadi created BEAM-5412:
--

 Summary: TFRecordIO fails with records larger than 8K
 Key: BEAM-5412
 URL: https://issues.apache.org/jira/browse/BEAM-5412
 Project: Beam
  Issue Type: Bug
  Components: io-java-text
Affects Versions: 2.4.0
Reporter: Raghu Angadi
Assignee: Eugene Kirpichov


This was reported on 
[Stackoverflow|https://stackoverflow.com/questions/52284639/beam-java-sdk-with-tfrecord-and-compression-gzip].
 TFRecordIO reader assumes a single call to {{channel.read()}} returns as much 
as can fit in the input buffer. {{read()}} can return fewer bytes than 
requested. Assert failure : 
https://github.com/apache/beam/blob/release-2.4.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java#L642



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-5375) KafkaIO reader should handle runtime exceptions kafka client

2018-09-12 Thread Raghu Angadi (JIRA)
Raghu Angadi created BEAM-5375:
--

 Summary: KafkaIO reader should handle runtime exceptions kafka 
client
 Key: BEAM-5375
 URL: https://issues.apache.org/jira/browse/BEAM-5375
 Project: Beam
  Issue Type: Bug
  Components: io-java-kafka
Affects Versions: 2.7.0
Reporter: Raghu Angadi
Assignee: Raghu Angadi


KafkaIO reader might stop reading from Kafka without any explicit error message 
if KafkaConsumer throws a runtime exception while polling for messages. One of 
the Dataflow customers encountered this issue (see [user@ 
thread|[https://lists.apache.org/thread.html/c0cf8f45f567a0623592e2d8340f5288e3e774b59bca985aec410a81@%3Cuser.beam.apache.org%3E])]

'consumerPollThread()' in KafkaIO deliberately avoided catching runtime 
exceptions. It shoud handle it.. stuff happens at runtime. 

It should result in 'IOException' from start()/advance(). The runners will 
handle properly reporting and closing the readers. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-4803) Beam spark runner not working properly with kafka

2018-08-10 Thread Raghu Angadi (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-4803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576566#comment-16576566
 ] 

Raghu Angadi commented on BEAM-4803:


It is not configurable. It was chosen mainly as a 'reasonable value'  :). In 
most of pipelines, Dataflow bundles are small enough that split gets scheduled 
very often. Spurious closes due to 1 minute timeout is not very common, it can 
happen if there is too much data buffered downstream. There was one issue 
reported on Stackoverflow where user had set up such that Kafka metrics were 
reported with a unique id for each instance of KafkaClient : 
[https://stackoverflow.com/questions/48765660/google-dataflow-idle-kafka-readers]
 . 

 

> Beam spark runner not working properly with kafka
> -
>
> Key: BEAM-4803
> URL: https://issues.apache.org/jira/browse/BEAM-4803
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-kafka, runner-spark
>Affects Versions: 2.4.0, 2.5.0
>Reporter: Vivek Agarwal
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>
> We are running a beam stream processing job on a spark runner, which reads 
> from a kafka topic using kerberos authentication. We are using java-io-kafka 
> v2.4.0 to read from kafka topic in the pipeline. The issue is that the 
> kafkaIO client is continuously creating a new kafka consumer with specified 
> config, doing kerberos login every time. Also, there are spark streaming jobs 
> which get spawned for the unbounded source, every second or so even when 
> there is no data in the kafka topic. Log has these jobs-
> INFO SparkContext: Starting job: dstr...@sparkunboumdedsource.java:172
> We can see in the logs
> INFO MicrobatchSource: No cached reader found for split: 
> [org.apache.beam.sdk.io.kafka.KafkaUnboundedSource@2919a728]. Creating new 
> reader at checkpoint mark...
> And then it creates new consumer doing fresh kerberos login, which is 
> creating issues.
> We are unsure of what should be correct behavior here and why so many spark 
> streaming jobs are getting created. We tried the beam code with flink runner 
> and did not find this issue there. Can someone point to the correct settings 
> for using unbounded kafka source with spark runner using beam? 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-4803) Beam spark runner not working properly with kafka

2018-08-08 Thread Raghu Angadi (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-4803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16573312#comment-16573312
 ] 

Raghu Angadi commented on BEAM-4803:


FYI, Dataflow applies an inactivity timeout of 1 minute for the cached readers. 
The worker closes the reader if a the specific source split is that the reader 
is associated with is not scheduled for 1 minute: common scenarios where it 
happens is when work moves to different worker (due autoscaling) or when the 
pipeline is too busy with the downstream work.

> Beam spark runner not working properly with kafka
> -
>
> Key: BEAM-4803
> URL: https://issues.apache.org/jira/browse/BEAM-4803
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-kafka, runner-spark
>Affects Versions: 2.4.0, 2.5.0
>Reporter: Vivek Agarwal
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>
> We are running a beam stream processing job on a spark runner, which reads 
> from a kafka topic using kerberos authentication. We are using java-io-kafka 
> v2.4.0 to read from kafka topic in the pipeline. The issue is that the 
> kafkaIO client is continuously creating a new kafka consumer with specified 
> config, doing kerberos login every time. Also, there are spark streaming jobs 
> which get spawned for the unbounded source, every second or so even when 
> there is no data in the kafka topic. Log has these jobs-
> INFO SparkContext: Starting job: dstr...@sparkunboumdedsource.java:172
> We can see in the logs
> INFO MicrobatchSource: No cached reader found for split: 
> [org.apache.beam.sdk.io.kafka.KafkaUnboundedSource@2919a728]. Creating new 
> reader at checkpoint mark...
> And then it creates new consumer doing fresh kerberos login, which is 
> creating issues.
> We are unsure of what should be correct behavior here and why so many spark 
> streaming jobs are getting created. We tried the beam code with flink runner 
> and did not find this issue there. Can someone point to the correct settings 
> for using unbounded kafka source with spark runner using beam? 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-4803) Beam spark runner not working properly with kafka

2018-07-17 Thread Raghu Angadi (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-4803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16547075#comment-16547075
 ] 

Raghu Angadi commented on BEAM-4803:


[~aromanenko], you have looked into Spark runner and KafkaIO issues before. 
Could you take a look? Please assign to default owner for Spark runner if that 
is more appropriate. 

> Beam spark runner not working properly with kafka
> -
>
> Key: BEAM-4803
> URL: https://issues.apache.org/jira/browse/BEAM-4803
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-kafka, runner-spark
>Affects Versions: 2.4.0
>Reporter: Vivek Agarwal
>Assignee: Alexey Romanenko
>Priority: Major
>
> We are running a beam stream processing job on a spark runner, which reads 
> from a kafka topic using kerberos authentication. We are using java-io-kafka 
> v2.4.0 to read from kafka topic in the pipeline. The issue is that the 
> kafkaIO client is continuously creating a new kafka consumer with specified 
> config, doing kerberos login every time. Also, there are spark streaming jobs 
> which get spawned for the unbounded source, every second or so even when 
> there is no data in the kafka topic. Log has these jobs-
> INFO SparkContext: Starting job: dstr...@sparkunboumdedsource.java:172
> We can see in the logs
> INFO MicrobatchSource: No cached reader found for split: 
> [org.apache.beam.sdk.io.kafka.KafkaUnboundedSource@2919a728]. Creating new 
> reader at checkpoint mark...
> And then it creates new consumer doing fresh kerberos login, which is 
> creating issues.
> We are unsure of what should be correct behavior here and why so many spark 
> streaming jobs are getting created. We tried the beam code with flink runner 
> and did not find this issue there. Can someone point to the correct settings 
> for using unbounded kafka source with spark runner using beam? 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-4803) Beam spark runner not working properly with kafka

2018-07-17 Thread Raghu Angadi (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4803?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raghu Angadi reassigned BEAM-4803:
--

Assignee: Alexey Romanenko  (was: Raghu Angadi)

> Beam spark runner not working properly with kafka
> -
>
> Key: BEAM-4803
> URL: https://issues.apache.org/jira/browse/BEAM-4803
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-kafka, runner-spark
>Affects Versions: 2.4.0
>Reporter: Vivek Agarwal
>Assignee: Alexey Romanenko
>Priority: Major
>
> We are running a beam stream processing job on a spark runner, which reads 
> from a kafka topic using kerberos authentication. We are using java-io-kafka 
> v2.4.0 to read from kafka topic in the pipeline. The issue is that the 
> kafkaIO client is continuously creating a new kafka consumer with specified 
> config, doing kerberos login every time. Also, there are spark streaming jobs 
> which get spawned for the unbounded source, every second or so even when 
> there is no data in the kafka topic. Log has these jobs-
> INFO SparkContext: Starting job: dstr...@sparkunboumdedsource.java:172
> We can see in the logs
> INFO MicrobatchSource: No cached reader found for split: 
> [org.apache.beam.sdk.io.kafka.KafkaUnboundedSource@2919a728]. Creating new 
> reader at checkpoint mark...
> And then it creates new consumer doing fresh kerberos login, which is 
> creating issues.
> We are unsure of what should be correct behavior here and why so many spark 
> streaming jobs are getting created. We tried the beam code with flink runner 
> and did not find this issue there. Can someone point to the correct settings 
> for using unbounded kafka source with spark runner using beam? 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-4192) Pubsub benchmarks in Nexmark

2018-07-13 Thread Raghu Angadi (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-4192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543462#comment-16543462
 ] 

Raghu Angadi commented on BEAM-4192:


Thanks. This is not urgent. For now I have removed 'Fix Versions'. I have a 
patch that I need to resume testing. Will look into it later.

> Pubsub benchmarks in Nexmark
> 
>
> Key: BEAM-4192
> URL: https://issues.apache.org/jira/browse/BEAM-4192
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-gcp
>Affects Versions: 2.4.0
>Reporter: Raghu Angadi
>Assignee: Raghu Angadi
>Priority: Major
>
> Nexmark benchmarks with Pubsub are not well tested. E.g: managing load 
> generator job is broken, it does not manage topics and subscriptions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (BEAM-4192) Pubsub benchmarks in Nexmark

2018-07-13 Thread Raghu Angadi (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raghu Angadi updated BEAM-4192:
---
Fix Version/s: (was: 2.7.0)

> Pubsub benchmarks in Nexmark
> 
>
> Key: BEAM-4192
> URL: https://issues.apache.org/jira/browse/BEAM-4192
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-gcp
>Affects Versions: 2.4.0
>Reporter: Raghu Angadi
>Assignee: Raghu Angadi
>Priority: Major
>
> Nexmark benchmarks with Pubsub are not well tested. E.g: managing load 
> generator job is broken, it does not manage topics and subscriptions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-07-12 Thread Raghu Angadi (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-4038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16541977#comment-16541977
 ] 

Raghu Angadi commented on BEAM-4038:


It is not a blocker. I removed 'Fix Version'. There is a PR, but it needs more 
updates. I have commented on it. Not sure when Geet can get around to it. This 
is not yet a requested feature. We could close this Jira for now and open a new 
one as required. Existing PR : https://github.com/apache/beam/pull/5287

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Geet Kumar
>Priority: Major
>  Time Spent: 9h 40m
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-07-12 Thread Raghu Angadi (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raghu Angadi updated BEAM-4038:
---
Fix Version/s: (was: 2.6.0)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Geet Kumar
>Priority: Major
>  Time Spent: 9h 40m
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-4632) kafkIO should be the streaming mode over spark runner

2018-06-25 Thread Raghu Angadi (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-4632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16522850#comment-16522850
 ] 

Raghu Angadi commented on BEAM-4632:


cc [~amitsela].

> kafkIO should be the streaming mode over spark runner
> -
>
> Key: BEAM-4632
> URL: https://issues.apache.org/jira/browse/BEAM-4632
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-kafka, runner-spark
>Affects Versions: 2.4.0
> Environment: Ubuntu 16.04.4 LTS
>Reporter: Rick Lin
>Assignee: Alexey Romanenko
>Priority: Major
> Fix For: 2.4.0
>
>
> Dear sir,
> The following versions of related tools are set in my running program:
> ==
> Beam 2.4.0 (Direct runner and Spark runner)
> Spark 2.2.1 (local mode and standalone mode)
> Kafka: 2.11-0.10.1.1
> scala: 2.11.8
> java: 1.8
> ==
> My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on my 
> github: [https://github.com/LinRick/beamkafkaIO],
> The description of my situation is as:
> {color:#14892c}The kafka broker is working and kafkaIO.read (consumer) is 
> used to capture data from the assigned broker ip 
> ([http://ubuntu7:9092)|http://ubuntu7:9092)./].{color}
> {color:#14892c}The user manual of kafkaIO SDK (on 
> web:[https://beam.apache.org/documentation/sdks/javadoc/2.4.0/])  indicates 
> that the following parameters need to be set, and then the kafkaIO can work 
> well.{color}
>  {color:#FF}.withBootstrapServers("kafka broker ip:9092"){color}
> {color:#FF} .withTopic("kafkasink"){color}
> {color:#FF} .withKeyDeserializer(IntegerDeserializer.class){color}
> {color:#FF} .withValueDeserializer(StringDeserializer.class) {color}
> When i run my program with these settings over direct runner, i can find that 
> my program perform well. In addition, my running program is the streaming 
> mode. *However, i run these codes with the same settings (kafkaIO) over spark 
> runner, and my running program is not the streaming mode and is shutdown*. 
> Here, as mentioned on the website: 
> [https://beam.apache.org/documentation/runners/spark/], the performing 
> program will automatically set streaming mode. 
> Unfortunately, it failed for my program.
> On the other hand, If i set the parameter  kafkaIO.read.withMaxNumRecords 
> (1000) or  kafkaIO.read.withMaxReadTime (Duration second), my program will 
> successfully execute as the batch mode (batch processing).
> The steps of performing StarterPipeline.java in my program are:
> step1 mvn compile exec:java -Dexec.mainClass=com.itri.beam.StarterPipeline 
> -Pspark2-runner -Dexec.args="--runner=SparkRunner"
> step2 mvn clean package
> step3 cp -rf target/beamkafkaIO-0.1.jar /root/
> step4 cd /spark-2.2.1-bin-hadoop2.6/bin
> step5 ./spark-submit --class com.itri.beam.StarterPipeline --master local[4] 
> /root/beamkafkaIO-0.1.jar --runner=SparkRunner
> I am not sure if this issue is a bug about kafkaIO or I was wrong with some 
> parameter settings over spark runner ?
> I really can't handle it, so I hope to get help from you.
> if any further information is needed, i am glad to be informed and will 
> provide to you as soon as possible.
> I will highly appreciate it if you can help me to deal with this issue.
> i am looking forward to hearing from you.
>  
> Sincerely yours,
>  
> Rick
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-4632) kafkIO should be the streaming mode over spark runner

2018-06-25 Thread Raghu Angadi (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-4632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16522533#comment-16522533
 ] 

Raghu Angadi commented on BEAM-4632:


Hi [~aromanenko], I am assigning this to you. Since it is works with 
`BoundedSource` wrapper when withMaxNumRecords (1000) is set, it is likely a 
issue between Beam unbounded sources and streaming Spark. I don't think I can 
get to this this week. Please assign it to onwers or Spark-runner if 
appropriate..

> kafkIO should be the streaming mode over spark runner
> -
>
> Key: BEAM-4632
> URL: https://issues.apache.org/jira/browse/BEAM-4632
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-kafka, runner-spark
>Affects Versions: 2.4.0
> Environment: Ubuntu 16.04.4 LTS
>Reporter: Rick Lin
>Assignee: Alexey Romanenko
>Priority: Major
> Fix For: 2.4.0
>
>
> Dear sir,
> The following versions of related tools are set in my running program:
> ==
> Beam 2.4.0 (Direct runner and Spark runner)
> Spark 2.2.1 (local mode and standalone mode)
> Kafka: 2.11-0.10.1.1
> scala: 2.11.8
> java: 1.8
> ==
> My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on my 
> github: [https://github.com/LinRick/beamkafkaIO],
> The description of my situation is as:
> {color:#14892c}The kafka broker is working and kafkaIO.read (consumer) is 
> used to capture data from the assigned broker ip 
> ([http://ubuntu7:9092)|http://ubuntu7:9092)./].{color}
> {color:#14892c}The user manual of kafkaIO SDK (on 
> web:[https://beam.apache.org/documentation/sdks/javadoc/2.4.0/])  indicates 
> that the following parameters need to be set, and then the kafkaIO can work 
> well.{color}
>  {color:#FF}.withBootstrapServers("kafka broker ip:9092"){color}
> {color:#FF} .withTopic("kafkasink"){color}
> {color:#FF} .withKeyDeserializer(IntegerDeserializer.class){color}
> {color:#FF} .withValueDeserializer(StringDeserializer.class) {color}
> When i run my program with these settings over direct runner, i can find that 
> my program perform well. In addition, my running program is the streaming 
> mode. *However, i run these codes with the same settings (kafkaIO) over spark 
> runner, and my running program is not the streaming mode and is shutdown*. 
> Here, as mentioned on the website: 
> [https://beam.apache.org/documentation/runners/spark/], the performing 
> program will automatically set streaming mode. 
> Unfortunately, it failed for my program.
> On the other hand, If i set the parameter  kafkaIO.read.withMaxNumRecords 
> (1000) or  kafkaIO.read.withMaxReadTime (Duration second), my program will 
> successfully execute as the batch mode (batch processing).
> The steps of performing StarterPipeline.java in my program are:
> step1 mvn compile exec:java -Dexec.mainClass=com.itri.beam.StarterPipeline 
> -Pspark2-runner -Dexec.args="--runner=SparkRunner"
> step2 mvn clean package
> step3 cp -rf target/beamkafkaIO-0.1.jar /root/
> step4 cd /spark-2.2.1-bin-hadoop2.6/bin
> step5 ./spark-submit --class com.itri.beam.StarterPipeline --master local[4] 
> /root/beamkafkaIO-0.1.jar --runner=SparkRunner
> I am not sure if this issue is a bug about kafkaIO or I was wrong with some 
> parameter settings over spark runner ?
> I really can't handle it, so I hope to get help from you.
> if any further information is needed, i am glad to be informed and will 
> provide to you as soon as possible.
> I will highly appreciate it if you can help me to deal with this issue.
> i am looking forward to hearing from you.
>  
> Sincerely yours,
>  
> Rick
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-4632) kafkIO should be the streaming mode over spark runner

2018-06-25 Thread Raghu Angadi (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raghu Angadi reassigned BEAM-4632:
--

Assignee: Alexey Romanenko  (was: Raghu Angadi)

> kafkIO should be the streaming mode over spark runner
> -
>
> Key: BEAM-4632
> URL: https://issues.apache.org/jira/browse/BEAM-4632
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-kafka, runner-spark
>Affects Versions: 2.4.0
> Environment: Ubuntu 16.04.4 LTS
>Reporter: Rick Lin
>Assignee: Alexey Romanenko
>Priority: Major
> Fix For: 2.4.0
>
>
> Dear sir,
> The following versions of related tools are set in my running program:
> ==
> Beam 2.4.0 (Direct runner and Spark runner)
> Spark 2.2.1 (local mode and standalone mode)
> Kafka: 2.11-0.10.1.1
> scala: 2.11.8
> java: 1.8
> ==
> My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on my 
> github: [https://github.com/LinRick/beamkafkaIO],
> The description of my situation is as:
> {color:#14892c}The kafka broker is working and kafkaIO.read (consumer) is 
> used to capture data from the assigned broker ip 
> ([http://ubuntu7:9092)|http://ubuntu7:9092)./].{color}
> {color:#14892c}The user manual of kafkaIO SDK (on 
> web:[https://beam.apache.org/documentation/sdks/javadoc/2.4.0/])  indicates 
> that the following parameters need to be set, and then the kafkaIO can work 
> well.{color}
>  {color:#FF}.withBootstrapServers("kafka broker ip:9092"){color}
> {color:#FF} .withTopic("kafkasink"){color}
> {color:#FF} .withKeyDeserializer(IntegerDeserializer.class){color}
> {color:#FF} .withValueDeserializer(StringDeserializer.class) {color}
> When i run my program with these settings over direct runner, i can find that 
> my program perform well. In addition, my running program is the streaming 
> mode. *However, i run these codes with the same settings (kafkaIO) over spark 
> runner, and my running program is not the streaming mode and is shutdown*. 
> Here, as mentioned on the website: 
> [https://beam.apache.org/documentation/runners/spark/], the performing 
> program will automatically set streaming mode. 
> Unfortunately, it failed for my program.
> On the other hand, If i set the parameter  kafkaIO.read.withMaxNumRecords 
> (1000) or  kafkaIO.read.withMaxReadTime (Duration second), my program will 
> successfully execute as the batch mode (batch processing).
> The steps of performing StarterPipeline.java in my program are:
> step1 mvn compile exec:java -Dexec.mainClass=com.itri.beam.StarterPipeline 
> -Pspark2-runner -Dexec.args="--runner=SparkRunner"
> step2 mvn clean package
> step3 cp -rf target/beamkafkaIO-0.1.jar /root/
> step4 cd /spark-2.2.1-bin-hadoop2.6/bin
> step5 ./spark-submit --class com.itri.beam.StarterPipeline --master local[4] 
> /root/beamkafkaIO-0.1.jar --runner=SparkRunner
> I am not sure if this issue is a bug about kafkaIO or I was wrong with some 
> parameter settings over spark runner ?
> I really can't handle it, so I hope to get help from you.
> if any further information is needed, i am glad to be informed and will 
> provide to you as soon as possible.
> I will highly appreciate it if you can help me to deal with this issue.
> i am looking forward to hearing from you.
>  
> Sincerely yours,
>  
> Rick
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (BEAM-4631) kafkIO should run the streaming mode over spark runner

2018-06-25 Thread Raghu Angadi (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raghu Angadi resolved BEAM-4631.

Resolution: Duplicate

Filed twice by mistake. Closing this in favor of it's clone BEAM-4632.

> kafkIO should run the streaming mode over spark runner
> --
>
> Key: BEAM-4631
> URL: https://issues.apache.org/jira/browse/BEAM-4631
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-kafka, runner-spark
>Affects Versions: 2.4.0
> Environment: Ubuntu 16.04.4 LTS
>Reporter: Rick Lin
>Assignee: Raghu Angadi
>Priority: Major
> Fix For: 2.4.0
>
>
> Dear sir,
> The following versions of related tools are set in my running program:
> ==
> Beam 2.4.0 (Direct runner and Spark runner)
> Spark 2.2.1 (local mode and standalone mode)
> Kafka: 2.11-0.10.1.1
> scala: 2.11.8
> java: 1.8
> ==
> My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on my 
> github: [https://github.com/LinRick/beamkafkaIO],
> The description of my situation is as:
> {color:#14892c}The kafka broker is working and kafkaIO.read (consumer) is 
> used to capture data from the assigned broker ip 
> ([http://ubuntu7:9092)|http://ubuntu7:9092)./].{color}
> {color:#14892c}The user manual of kafkaIO SDK (on 
> web:[https://beam.apache.org/documentation/sdks/javadoc/2.4.0/])  indicates 
> that the following parameters need to be set, and then the kafkaIO can work 
> well.{color}
>  {color:#FF}.withBootstrapServers("kafka broker ip:9092"){color}
> {color:#FF} .withTopic("kafkasink"){color}
> {color:#FF} .withKeyDeserializer(IntegerDeserializer.class){color}
> {color:#FF} .withValueDeserializer(StringDeserializer.class) {color}
> When i run my program with these settings over direct runner, i can find that 
> my program perform well. In addition, my running program is the streaming 
> mode. *However, i run these codes with the same settings (kafkaIO) over spark 
> runner, and my running program is not the streaming mode and is shutdown*. 
> Here, as mentioned on the website: 
> [https://beam.apache.org/documentation/runners/spark/], the performing 
> program will automatically set streaming mode. 
> Unfortunately, it failed for my program.
> On the other hand, If i set the parameter  kafkaIO.read.withMaxNumRecords 
> (1000) or  kafkaIO.read.withMaxReadTime (Duration second), my program will 
> successfully execute as the batch mode (batch processing).
> The steps of performing StarterPipeline.java in my program are:
> step1 mvn compile exec:java -Dexec.mainClass=com.itri.beam.StarterPipeline 
> -Pspark2-runner -Dexec.args="--runner=SparkRunner"
> step2 mvn clean package
> step3 cp -rf target/beamkafkaIO-0.1.jar /root/
> step4 cd /spark-2.2.1-bin-hadoop2.6/bin
> step5 ./spark-submit --class com.itri.beam.StarterPipeline --master local[4] 
> /root/beamkafkaIO-0.1.jar --runner=SparkRunner
> I am not sure if this issue is a bug about kafkaIO or I was wrong with some 
> parameter settings over spark runner ?
> I really can't handle it, so I hope to get help from you.
> if any further information is needed, i am glad to be informed and will 
> provide to you as soon as possible.
> I will highly appreciate it if you can help me to deal with this issue.
> i am looking forward to hearing from you.
>  
> Sincerely yours,
>  
> Rick
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-4521) Backlog metrics not showing up

2018-06-08 Thread Raghu Angadi (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-4521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16506228#comment-16506228
 ] 

Raghu Angadi commented on BEAM-4521:


This is really a flink bug. All the Beam API methods are supposed to have the 
required context. Similar issue with Flink runner came up with in another 
context where it accesses readers concurrently even though spec explicitly 
disallows that ([PR 
3985|https://github.com/apache/beam/pull/3985#issuecomment-338051096]). We 
could do that in advance(), but that would unnecessary overhead for every 
single record. I would like to hear from Flink runner authors about this. Note 
that these issues affect all the unbounded sources, not just KafkaIO. 

> Backlog metrics not showing up
> --
>
> Key: BEAM-4521
> URL: https://issues.apache.org/jira/browse/BEAM-4521
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.4.0
>Reporter: Jozef Vilcek
>Assignee: Aljoscha Krettek
>Priority: Minor
>
> Hello,
> I wanted to track _backlog_bytes_ and _backlog_elements_ metrics from 
> SinkMetrics for kafka. I see in the code that KafkaUnboundedReader is 
> reporting them but was not able to get them visible (running on Flink).
> Metrics are reported here:
> [https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L668]
> In Flink runtime I noticed this message:
> {code:java}
> 2018-06-07 08:53:53,216 ERROR org.apache.beam.sdk.metrics.MetricsEnvironment  
>               - Unable to update metrics on the current thread. Most likely 
> caused by using metrics outside the managed work-execution thread.
> {code}
> I see that backlog is reported from getCheckpointMark(), which is done by 
> some other thread. Not sure why it is done there. 
> I tested locally to move it to the advance() method, where bytes_read is 
> reported and it worked. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-4521) Backlog metrics not showing up

2018-06-08 Thread Raghu Angadi (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4521?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raghu Angadi reassigned BEAM-4521:
--

Assignee: Aljoscha Krettek  (was: Raghu Angadi)

> Backlog metrics not showing up
> --
>
> Key: BEAM-4521
> URL: https://issues.apache.org/jira/browse/BEAM-4521
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.4.0
>Reporter: Jozef Vilcek
>Assignee: Aljoscha Krettek
>Priority: Minor
>
> Hello,
> I wanted to track _backlog_bytes_ and _backlog_elements_ metrics from 
> SinkMetrics for kafka. I see in the code that KafkaUnboundedReader is 
> reporting them but was not able to get them visible (running on Flink).
> Metrics are reported here:
> [https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L668]
> In Flink runtime I noticed this message:
> {code:java}
> 2018-06-07 08:53:53,216 ERROR org.apache.beam.sdk.metrics.MetricsEnvironment  
>               - Unable to update metrics on the current thread. Most likely 
> caused by using metrics outside the managed work-execution thread.
> {code}
> I see that backlog is reported from getCheckpointMark(), which is done by 
> some other thread. Not sure why it is done there. 
> I tested locally to move it to the advance() method, where bytes_read is 
> reported and it worked. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (BEAM-4521) Backlog metrics not showing up

2018-06-08 Thread Raghu Angadi (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4521?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raghu Angadi updated BEAM-4521:
---
Component/s: (was: io-java-kafka)
 runner-flink

> Backlog metrics not showing up
> --
>
> Key: BEAM-4521
> URL: https://issues.apache.org/jira/browse/BEAM-4521
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.4.0
>Reporter: Jozef Vilcek
>Assignee: Raghu Angadi
>Priority: Minor
>
> Hello,
> I wanted to track _backlog_bytes_ and _backlog_elements_ metrics from 
> SinkMetrics for kafka. I see in the code that KafkaUnboundedReader is 
> reporting them but was not able to get them visible (running on Flink).
> Metrics are reported here:
> [https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L668]
> In Flink runtime I noticed this message:
> {code:java}
> 2018-06-07 08:53:53,216 ERROR org.apache.beam.sdk.metrics.MetricsEnvironment  
>               - Unable to update metrics on the current thread. Most likely 
> caused by using metrics outside the managed work-execution thread.
> {code}
> I see that backlog is reported from getCheckpointMark(), which is done by 
> some other thread. Not sure why it is done there. 
> I tested locally to move it to the advance() method, where bytes_read is 
> reported and it worked. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (BEAM-4485) Incredibly difficult to use KakfaIO + TLS + DataflowRunner

2018-06-05 Thread Raghu Angadi (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4485?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raghu Angadi resolved BEAM-4485.

   Resolution: Not A Problem
Fix Version/s: Not applicable

> Incredibly difficult to use KakfaIO + TLS + DataflowRunner
> --
>
> Key: BEAM-4485
> URL: https://issues.apache.org/jira/browse/BEAM-4485
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-kafka
>Reporter: Andre
>Assignee: Raghu Angadi
>Priority: Minor
> Fix For: Not applicable
>
>
> When attempting to use KafkaIO.Read with DataflowRunner, I have hit a lot of 
> walls. The brokers need to be accessible both locally and from the dataflow 
> runner instances. This means, when using TLS authentication, the 
> keystore/truststore files need to be available locally and on the instances. 
> I programmatically add the files to the pipeline options with
> {noformat}
> List filesToStage = 
> PipelineResources.detectClassPathResourcesToStage(IndicatorIngest.class.getClassLoader());
> filesToStage.add("trust.p12");
> filesToStage.add("server.p12");
> {noformat}
> but even when I do this, the remote file names are different. This means that 
> I need to determine the remote file name myself, like this
> {noformat}
> PackageAttributes.forFileToStage(new File(filepath), 
> filepath).getDestination().getName();
> {noformat}
> but that function is package private, so I need to wrap this call with a 
> custom class in org.apache.beam.runners.dataflow.util. When I calculate this 
> filename, I can use it to set the ssl..location, but this is the wrong 
> location locally, and it needs to be correct both locally and remotely. This 
> means in my main I need to calculate the local files remote names, copy them 
> to the local path with the same name, dynamically set the property to this 
> path, and programmatically add these files to be staged so they hopefully 
> have the same name on the worker. KafkaConsumer doesn't seem to provide any 
> other way to specify where to get these keys from.
> My question is, I am supposed to be jumping through all these hoops, or am I 
> doing something (or multiple things) completely wrong?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-4485) Incredibly difficult to use KakfaIO + TLS + DataflowRunner

2018-06-05 Thread Raghu Angadi (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-4485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16502336#comment-16502336
 ] 

Raghu Angadi commented on BEAM-4485:


Fortunately each of the two problems you mentioned have alternate ways:
 # access on local launcher machine : KafkaIO needs this in-order to fetch 
number of partitions for the topic. If you don't have access, then you can also 
provide list of partitons explicity to reader. See `withTopicPartitions()` api. 
Dataflow worker machine do need access.
  
 # Difficulty with providing key files : This was a limitation in KafkaConsumer 
configuration.  Setting ACLs in Kafka got much better in 0.10.2 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-85%3A+Dynamic+JAAS+configuration+for+Kafka+clients]
 It allows setting these parameters in consumer config itself, which makes it 
simpler to use with Dataflow. KafkaIO allows setting pretty much any of the 
consumer config.

Does this help?

> Incredibly difficult to use KakfaIO + TLS + DataflowRunner
> --
>
> Key: BEAM-4485
> URL: https://issues.apache.org/jira/browse/BEAM-4485
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-kafka
>Reporter: Andre
>Assignee: Raghu Angadi
>Priority: Minor
> Fix For: Not applicable
>
>
> When attempting to use KafkaIO.Read with DataflowRunner, I have hit a lot of 
> walls. The brokers need to be accessible both locally and from the dataflow 
> runner instances. This means, when using TLS authentication, the 
> keystore/truststore files need to be available locally and on the instances. 
> I programmatically add the files to the pipeline options with
> {noformat}
> List filesToStage = 
> PipelineResources.detectClassPathResourcesToStage(IndicatorIngest.class.getClassLoader());
> filesToStage.add("trust.p12");
> filesToStage.add("server.p12");
> {noformat}
> but even when I do this, the remote file names are different. This means that 
> I need to determine the remote file name myself, like this
> {noformat}
> PackageAttributes.forFileToStage(new File(filepath), 
> filepath).getDestination().getName();
> {noformat}
> but that function is package private, so I need to wrap this call with a 
> custom class in org.apache.beam.runners.dataflow.util. When I calculate this 
> filename, I can use it to set the ssl..location, but this is the wrong 
> location locally, and it needs to be correct both locally and remotely. This 
> means in my main I need to calculate the local files remote names, copy them 
> to the local path with the same name, dynamically set the property to this 
> path, and programmatically add these files to be staged so they hopefully 
> have the same name on the worker. KafkaConsumer doesn't seem to provide any 
> other way to specify where to get these keys from.
> My question is, I am supposed to be jumping through all these hoops, or am I 
> doing something (or multiple things) completely wrong?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-2980) BagState.isEmpty needs a tighter spec

2018-05-23 Thread Raghu Angadi (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16488425#comment-16488425
 ] 

Raghu Angadi commented on BEAM-2980:


Is there a pointer to rationale for existence of {{isEmpty()}}? 

> BagState.isEmpty needs a tighter spec
> -
>
> Key: BEAM-2980
> URL: https://issues.apache.org/jira/browse/BEAM-2980
> Project: Beam
>  Issue Type: Bug
>  Components: beam-model
>Reporter: Kenneth Knowles
>Assignee: Daniel Mills
>Priority: Major
>
> Consider the following:
> {code}
> BagState myBag = // empty
> ReadableState isMyBagEmpty = myBag.isEmpty();
> myBag.add(bizzle);
> bool empty = isMyBagEmpty.read();
> {code}
> Should {{empty}} be true or false? We need a consistent answer, across all 
> kinds of state, when snapshots are required.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-4372) Need an undeprecated Reshuffle transform

2018-05-21 Thread Raghu Angadi (JIRA)
Raghu Angadi created BEAM-4372:
--

 Summary: Need an undeprecated Reshuffle transform
 Key: BEAM-4372
 URL: https://issues.apache.org/jira/browse/BEAM-4372
 Project: Beam
  Issue Type: Bug
  Components: sdk-java-core
Reporter: Raghu Angadi
Assignee: Kenneth Knowles


{{Reshuffle}} transform is a convenient wrapper around GBK. It preserves user 
windowing correctly which is not always obvious for users. It usage has grown 
over many months since it was deprecated as it makes it easier for potential 
incorrect/unportable use of GroupByKey semantics. See [dev 
thread|https://lists.apache.org/thread.html/820064a81c86a6d44f21f0d6c68ea3f46cec151e5e1a0b52eeed3fbf@%3Cdev.beam.apache.org%3E]
 for more discussion.

 

There is a broad agreement we do need a transform that is meant for its common 
used purpose in user pipeline : to control parallelism (e.g. to control 
connections to a DB). 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-05-01 Thread Raghu Angadi (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-4038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16459481#comment-16459481
 ] 

Raghu Angadi commented on BEAM-4038:


In Kafka {{ConsumerRecord}} and {{ProducerRecord}} are logically different kind 
of records. Producer record does not have offset, e.g. I think it is better to 
keep them separate here as well in order not to confuse the read and write 
cases. We could create our own shadow \{{KafkaProducerRecord}}, but probably 
not necessary, simpler to use Kafka's {{ProducerRecord}} directly. 

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Geet Kumar
>Priority: Major
> Fix For: 2.5.0
>
>  Time Spent: 9h 10m
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-4192) Pubsub benchmarks in Nexmark

2018-04-30 Thread Raghu Angadi (JIRA)
Raghu Angadi created BEAM-4192:
--

 Summary: Pubsub benchmarks in Nexmark
 Key: BEAM-4192
 URL: https://issues.apache.org/jira/browse/BEAM-4192
 Project: Beam
  Issue Type: Improvement
  Components: io-java-gcp
Affects Versions: 2.4.0
Reporter: Raghu Angadi
Assignee: Raghu Angadi
 Fix For: 2.5.0


Nexmark benchmarks with Pubsub are not well tested. E.g: managing load 
generator job is broken, it does not manage topics and subscriptions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-29 Thread Raghu Angadi (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raghu Angadi updated BEAM-4038:
---
Priority: Major  (was: Minor)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Geet Kumar
>Priority: Major
> Fix For: 2.5.0
>
>  Time Spent: 9h 10m
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-29 Thread Raghu Angadi (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-4038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458325#comment-16458325
 ] 

Raghu Angadi commented on BEAM-4038:


The interfaces with function call backs are problematic since we need to decide 
what context to provide for the function. E.g. 
{{KafkaPublishTimestampFunction}} provides {{elementTimestamp}}. Why? since we 
think that is probably what user wants to know along with KV. Recently we 
deprecated old timestamp functions for reader in order to support watermarks 
better. At least in the case of reader, there is no alternative to having a 
function callback since we need to set watermark/timestamp _before_ the user 
gets to see the record. 

 

Long story short, I think it is better to avoid another function to set 
headers. It will be similar story when more fields are added to 
{{KafkaRecord}}. In fact I think we should remove 
{{KafkaPublishTimestampFunction}}. 

How about adding {{KafkaIO.writeRecords()}} which is a 
{{PCollection<{ProducerRecord, PDone>}}? This way user builds the 
ProducerRecord anyway they see fit. We can provide Avro coder for Kafka's 
{{ProducerRecord}}. We can handle older kafka versions by ignoring fields that 
are not present in old versions. We can add a coder very similar to 
{{KafkaRecordCoder}}.

This is more work than adding a function, but I think it improve flexibility of 
writer now and for future. 

 

 

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Geet Kumar
>Priority: Minor
> Fix For: 2.5.0
>
>  Time Spent: 9h 10m
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (BEAM-3851) Support element timestamps while publishing to Kafka.

2018-04-29 Thread Raghu Angadi (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3851?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raghu Angadi resolved BEAM-3851.

Resolution: Fixed

> Support element timestamps while publishing to Kafka.
> -
>
> Key: BEAM-3851
> URL: https://issues.apache.org/jira/browse/BEAM-3851
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-kafka
>Affects Versions: 2.3.0
>Reporter: Raghu Angadi
>Assignee: Raghu Angadi
>Priority: Major
> Fix For: 2.5.0
>
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> KafkaIO sink should support using input element timestamp for the message 
> published to Kafka. Otherwise there is no way for user to influence the 
> timestamp of the messages in Kafka sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (BEAM-591) Better handling of watermark in KafkaIO

2018-04-29 Thread Raghu Angadi (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-591?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raghu Angadi resolved BEAM-591.
---
   Resolution: Fixed
Fix Version/s: 2.4.0

> Better handling of watermark in KafkaIO
> ---
>
> Key: BEAM-591
> URL: https://issues.apache.org/jira/browse/BEAM-591
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-kafka
>Reporter: Raghu Angadi
>Assignee: Raghu Angadi
>Priority: Major
> Fix For: 2.4.0
>
>  Time Spent: 3h 40m
>  Remaining Estimate: 0h
>
> Right now default watermark in KafkaIO is same as timestamp of the record. 
> The main problem with this is that watermark does not change if there n't any 
> new records on the topic. This can hold up many open windows. 
> The record timestamp by default is set to processing time (i.e. when the 
> runner reads a record from Kafka reader).
> A user can provide functions to calculate watermark and record timestamps. 
> There are a few concerns with current design:
> * What should happen when a kafka topic is idle:
>   ** in default case, I think watermark should advance to current time.
>   ** What should happen when user has provided a function to calculate record 
> timestamp? 
>*** Should the watermark stay same as record timestamp?
>*** same when user has provided own watermark function? 
> * Are the current semantics of user provided watermark function correct?
>   ** -it is run once for each record read-.
>   ** -Should it instead be run inside {{getWatermark()}} called by the runner 
> (we could still provide the last user record, and its timestamp)-.
>   ** It does run inside {{getWatermark()}}. should we pass current record 
> timestamp in addition to the record?
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-591) Better handling of watermark in KafkaIO

2018-04-29 Thread Raghu Angadi (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458307#comment-16458307
 ] 

Raghu Angadi commented on BEAM-591:
---

See following methods added to `KafkaIO.Read` in 3 pull requests attached this 
jira:
 * {{withLogAppendTime()}}
 * {{withCreateTime()}}
 * {{withProcessingTime()}}
 * {{withTimestampPolicyFactory()}}


> Better handling of watermark in KafkaIO
> ---
>
> Key: BEAM-591
> URL: https://issues.apache.org/jira/browse/BEAM-591
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-kafka
>Reporter: Raghu Angadi
>Assignee: Raghu Angadi
>Priority: Major
> Fix For: 2.4.0
>
>  Time Spent: 3h 40m
>  Remaining Estimate: 0h
>
> Right now default watermark in KafkaIO is same as timestamp of the record. 
> The main problem with this is that watermark does not change if there n't any 
> new records on the topic. This can hold up many open windows. 
> The record timestamp by default is set to processing time (i.e. when the 
> runner reads a record from Kafka reader).
> A user can provide functions to calculate watermark and record timestamps. 
> There are a few concerns with current design:
> * What should happen when a kafka topic is idle:
>   ** in default case, I think watermark should advance to current time.
>   ** What should happen when user has provided a function to calculate record 
> timestamp? 
>*** Should the watermark stay same as record timestamp?
>*** same when user has provided own watermark function? 
> * Are the current semantics of user provided watermark function correct?
>   ** -it is run once for each record read-.
>   ** -Should it instead be run inside {{getWatermark()}} called by the runner 
> (we could still provide the last user record, and its timestamp)-.
>   ** It does run inside {{getWatermark()}}. should we pass current record 
> timestamp in addition to the record?
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-4086) KafkaIOTest is flaky

2018-04-19 Thread Raghu Angadi (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-4086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1604#comment-1604
 ] 

Raghu Angadi commented on BEAM-4086:


{{testUnboundedSourceWithoutBoundedWrapper()}} is the test that occasionally 
hangs. Sent a fix for CPU spinning MockConsumer. As a side effect it seems to 
avoid this flake. 2400 runs for gradle test runs in sequence passed.

I still don't know the root cause, will keep this bug open. 

> KafkaIOTest is flaky
> 
>
> Key: BEAM-4086
> URL: https://issues.apache.org/jira/browse/BEAM-4086
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-kafka
>Affects Versions: 2.5.0
>Reporter: Ismaël Mejía
>Assignee: Raghu Angadi
>Priority: Major
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Noticed this while trying to do a simple change in KafkaIO this morning and 
> corroborated with other contributors. If you run `./gradlew -p 
> sdks/java/io/kafka/ clean build` it blocks indefinitely at least 1/3 of the 
> times. However it passes ok with Maven.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-4086) KafkaIOTest is flaky

2018-04-18 Thread Raghu Angadi (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-4086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16443143#comment-16443143
 ] 

Raghu Angadi commented on BEAM-4086:


Nm. I was running gradle. It did finally run into this.

> KafkaIOTest is flaky
> 
>
> Key: BEAM-4086
> URL: https://issues.apache.org/jira/browse/BEAM-4086
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-kafka
>Affects Versions: 2.5.0
>Reporter: Ismaël Mejía
>Assignee: Raghu Angadi
>Priority: Major
>
> Noticed this while trying to do a simple change in KafkaIO this morning and 
> corroborated with other contributors. If you run `./gradlew -p 
> sdks/java/io/kafka/ clean build` it blocks indefinitely at least 1/3 of the 
> times. However it passes ok with Maven.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-4086) KafkaIO is flaky with gradle build

2018-04-18 Thread Raghu Angadi (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-4086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16442915#comment-16442915
 ] 

Raghu Angadi commented on BEAM-4086:


[~iemejia], assigning it to you since it happened frequently in your 
environment. I am running the build in a loop on my desktop and so far have not 
been successful in reproducing it. Could you attach jstack of the tests when 
this happens? Also check if the process is a lot of CPU. 

> KafkaIO is flaky with gradle build
> --
>
> Key: BEAM-4086
> URL: https://issues.apache.org/jira/browse/BEAM-4086
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-kafka
>Affects Versions: 2.5.0
>Reporter: Ismaël Mejía
>Assignee: Ismaël Mejía
>Priority: Major
>
> Noticed this while trying to do a simple change in KafkaIO this morning and 
> corroborated with other contributors. If you run `./gradlew -p 
> sdks/java/io/kafka/ clean build` it blocks indefinitely at least 1/3 of the 
> times. However it passes ok with Maven.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-4086) KafkaIO is flaky with gradle build

2018-04-18 Thread Raghu Angadi (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4086?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raghu Angadi reassigned BEAM-4086:
--

Assignee: Ismaël Mejía  (was: Alexey Romanenko)

> KafkaIO is flaky with gradle build
> --
>
> Key: BEAM-4086
> URL: https://issues.apache.org/jira/browse/BEAM-4086
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-kafka
>Affects Versions: 2.5.0
>Reporter: Ismaël Mejía
>Assignee: Ismaël Mejía
>Priority: Major
>
> Noticed this while trying to do a simple change in KafkaIO this morning and 
> corroborated with other contributors. If you run `./gradlew -p 
> sdks/java/io/kafka/ clean build` it blocks indefinitely at least 1/3 of the 
> times. However it passes ok with Maven.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-4086) KafkaIO is flaky with gradle build

2018-04-18 Thread Raghu Angadi (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4086?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raghu Angadi reassigned BEAM-4086:
--

Assignee: Alexey Romanenko  (was: Raghu Angadi)

> KafkaIO is flaky with gradle build
> --
>
> Key: BEAM-4086
> URL: https://issues.apache.org/jira/browse/BEAM-4086
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-kafka
>Affects Versions: 2.5.0
>Reporter: Ismaël Mejía
>Assignee: Alexey Romanenko
>Priority: Major
>
> Noticed this while trying to do a simple change in KafkaIO this morning and 
> corroborated with other contributors. If you run `./gradlew -p 
> sdks/java/io/kafka/ clean build` it blocks indefinitely at least 1/3 of the 
> times. However it passes ok with Maven.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-4086) KafkaIO is flaky with gradle build

2018-04-18 Thread Raghu Angadi (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-4086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16442873#comment-16442873
 ] 

Raghu Angadi commented on BEAM-4086:


[~aromanenko], I am not able to reproduce it on my Debian desktop. Could you 
try it and send the stacktrace?

> KafkaIO is flaky with gradle build
> --
>
> Key: BEAM-4086
> URL: https://issues.apache.org/jira/browse/BEAM-4086
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-kafka
>Affects Versions: 2.5.0
>Reporter: Ismaël Mejía
>Assignee: Raghu Angadi
>Priority: Major
>
> Noticed this while trying to do a simple change in KafkaIO this morning and 
> corroborated with other contributors. If you run `./gradlew -p 
> sdks/java/io/kafka/ clean build` it blocks indefinitely at least 1/3 of the 
> times. However it passes ok with Maven.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-4086) KafkaIO is flaky with gradle build

2018-04-18 Thread Raghu Angadi (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4086?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raghu Angadi reassigned BEAM-4086:
--

Assignee: Raghu Angadi

> KafkaIO is flaky with gradle build
> --
>
> Key: BEAM-4086
> URL: https://issues.apache.org/jira/browse/BEAM-4086
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-kafka
>Affects Versions: 2.5.0
>Reporter: Ismaël Mejía
>Assignee: Raghu Angadi
>Priority: Major
>
> Noticed this while trying to do a simple change in KafkaIO this morning and 
> corroborated with other contributors. If you run `./gradlew -p 
> sdks/java/io/kafka/ clean build` it blocks indefinitely at least 1/3 of the 
> times. However it passes ok with Maven.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-11 Thread Raghu Angadi (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-4038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16434033#comment-16434033
 ] 

Raghu Angadi commented on BEAM-4038:


Thanks. Will be happy to review. We need to make sure it works with older 
version of kafka (0.9.x). See ConsumerSpEL#getRecordTimestamp() for example.

Regd write support, I am not sure how KV can be used to set header without a 
code change in KafkaIO. The KafkaWriter need to support it. We can do read 
support first.

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-10 Thread Raghu Angadi (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-4038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16432658#comment-16432658
 ] 

Raghu Angadi commented on BEAM-4038:


Reader support
 * I think this more critical and trivial to add. Just need to add headers 
field to KafkaRecord in KafkaIO.

Writer support
 * We need to decide how the user is going to provide the headers. It is 
probably better to let user write KafkaRecord in addition to KV<>.

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (BEAM-3703) java.io.IOException: KafkaWriter : failed to send 1 records (since last report)

2018-03-15 Thread Raghu Angadi (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raghu Angadi resolved BEAM-3703.

Resolution: Not A Bug

> java.io.IOException: KafkaWriter : failed to send 1 records (since last 
> report)
> ---
>
> Key: BEAM-3703
> URL: https://issues.apache.org/jira/browse/BEAM-3703
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-kafka, runner-flink
>Affects Versions: 2.2.0
>Reporter: jagdeep sihota
>Assignee: Raghu Angadi
>Priority: Major
> Fix For: Not applicable
>
>
> I am trying to read from file and write to Kafka in google cloud kafka and 
> getting following error:
>  
> {code}
> org.apache.beam.sdk.util.UserCodeException: java.io.IOException: KafkaWriter 
> : failed to send 1 records (since last report)
>at 
> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
>at 
> org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter$DoFnInvoker.invokeProcessElement(Unknown
>  Source)
>at 
> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177)
>at 
> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:141)
>at 
> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:65)
>at 
> org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction.mapPartition(FlinkDoFnFunction.java:120)
>at 
> org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103)
>at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
>at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
>at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: KafkaWriter : failed to send 1 records (since 
> last report)
>at 
> org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter.checkForFailures(KafkaIO.java:1639)
>at 
> org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter.processElement(KafkaIO.java:1581)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update 
> metadata after 6 ms.
> {code}
>  {code}
> .apply(KafkaIO._write_()
> .withBootstrapServers("ip1:9092,ip2:9092")
> .withTopic("feed")
> .withValueSerializer(StringSerializer.class)
> .withKeySerializer(StringSerializer.class)
>         
> //.updateProducerProperties(ImmutableMap.of("security.protocol","PLAINTEXT"))
>         //.updateProducerProperties(ImmutableMap.of("sasl.mechanism","PLAIN"))
> .values() // writes values to Kafka with default key
> {code}
>  
> Kafka is running on google cloud bitnami and I am using Flink runner
> How do I pass security information to Kafka IO?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-3703) java.io.IOException: KafkaWriter : failed to send 1 records (since last report)

2018-03-15 Thread Raghu Angadi (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16400786#comment-16400786
 ] 

Raghu Angadi commented on BEAM-3703:


[~jsihota], most likely this is a network ACL issue. Make sure your VMs 
(dataflow workers) can access Kafka cluster. For security, you can configure 
SASL by setting appropriate client/producer config : 
[https://docs.confluent.io/current/tutorials/security_tutorial.html#clients]

I will close this for now, please open a new Jira or ask about it on 
stackoverflow. 

> java.io.IOException: KafkaWriter : failed to send 1 records (since last 
> report)
> ---
>
> Key: BEAM-3703
> URL: https://issues.apache.org/jira/browse/BEAM-3703
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-kafka, runner-flink
>Affects Versions: 2.2.0
>Reporter: jagdeep sihota
>Assignee: Raghu Angadi
>Priority: Major
> Fix For: Not applicable
>
>
> I am trying to read from file and write to Kafka in google cloud kafka and 
> getting following error:
>  
> {code}
> org.apache.beam.sdk.util.UserCodeException: java.io.IOException: KafkaWriter 
> : failed to send 1 records (since last report)
>at 
> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
>at 
> org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter$DoFnInvoker.invokeProcessElement(Unknown
>  Source)
>at 
> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177)
>at 
> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:141)
>at 
> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:65)
>at 
> org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction.mapPartition(FlinkDoFnFunction.java:120)
>at 
> org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103)
>at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
>at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
>at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: KafkaWriter : failed to send 1 records (since 
> last report)
>at 
> org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter.checkForFailures(KafkaIO.java:1639)
>at 
> org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter.processElement(KafkaIO.java:1581)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update 
> metadata after 6 ms.
> {code}
>  {code}
> .apply(KafkaIO._write_()
> .withBootstrapServers("ip1:9092,ip2:9092")
> .withTopic("feed")
> .withValueSerializer(StringSerializer.class)
> .withKeySerializer(StringSerializer.class)
>         
> //.updateProducerProperties(ImmutableMap.of("security.protocol","PLAINTEXT"))
>         //.updateProducerProperties(ImmutableMap.of("sasl.mechanism","PLAIN"))
> .values() // writes values to Kafka with default key
> {code}
>  
> Kafka is running on google cloud bitnami and I am using Flink runner
> How do I pass security information to Kafka IO?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-3851) Support element timestamps while publishing to Kafka.

2018-03-14 Thread Raghu Angadi (JIRA)
Raghu Angadi created BEAM-3851:
--

 Summary: Support element timestamps while publishing to Kafka.
 Key: BEAM-3851
 URL: https://issues.apache.org/jira/browse/BEAM-3851
 Project: Beam
  Issue Type: Improvement
  Components: io-java-kafka
Affects Versions: 2.3.0
Reporter: Raghu Angadi
Assignee: Raghu Angadi
 Fix For: 2.4.0


KafkaIO sink should support using input element timestamp for the message 
published to Kafka. Otherwise there is no way for user to influence the 
timestamp of the messages in Kafka sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-3718) ClassNotFoundException on CloudResourceManager$Builder

2018-03-06 Thread Raghu Angadi (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16388279#comment-16388279
 ] 

Raghu Angadi commented on BEAM-3718:


Assigned it to [~chamikara] for triaging. Looks like one of GCP dependencies 
are missing or mismatched.

> ClassNotFoundException on CloudResourceManager$Builder
> --
>
> Key: BEAM-3718
> URL: https://issues.apache.org/jira/browse/BEAM-3718
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-gcp
>Reporter: Yunis Arif Said
>Assignee: Chamikara Jayalath
>Priority: Trivial
>
> In a spring boot application running google cloud dataflow code. The dataflow 
> takes data from google PubSub, transform incoming data and output result to 
> bigquery for storage. The code does not have any syntax errors. The problem 
> is when the application is run, the following exception is thrown. 
>  
> {code:java}
>  Exception in thread "main" java.lang.RuntimeException: Failed to construct 
> instance from factory method DataflowRunner#fromOptions(interface 
> org.apache.beam.sdk.options.PipelineOptions)
>  at 
> org.apache.beam.sdk.util.InstanceBuilder.buildFromMethod(InstanceBuilder.java:233)
>  at org.apache.beam.sdk.util.InstanceBuilder.build(InstanceBuilder.java:162)
>  at org.apache.beam.sdk.PipelineRunner.fromOptions(PipelineRunner.java:52)
>  at org.apache.beam.sdk.Pipeline.create(Pipeline.java:142)
>  at com.trackers.exlon.ExlonApplication.main(ExlonApplication.java:69)
>  
>  Caused by: java.lang.reflect.InvocationTargetException
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498)
>  at 
> org.apache.beam.sdk.util.InstanceBuilder.buildFromMethod(InstanceBuilder.java:222)
>  ... 4 more
> Caused by: java.lang.NoClassDefFoundError: 
> com/google/api/services/cloudresourcemanager/CloudResourceManager$Builder
>  at 
> org.apache.beam.sdk.extensions.gcp.options.GcpOptions$GcpTempLocationFactory.newCloudResourceManagerClient(GcpOptions.java:369)
>  at 
> org.apache.beam.sdk.extensions.gcp.options.GcpOptions$GcpTempLocationFactory.create(GcpOptions.java:240)
>  at 
> org.apache.beam.sdk.extensions.gcp.options.GcpOptions$GcpTempLocationFactory.create(GcpOptions.java:228)
>  at 
> org.apache.beam.sdk.options.ProxyInvocationHandler.returnDefaultHelper(ProxyInvocationHandler.java:592)
>  at 
> org.apache.beam.sdk.options.ProxyInvocationHandler.getDefault(ProxyInvocationHandler.java:533)
>  at 
> org.apache.beam.sdk.options.ProxyInvocationHandler.invoke(ProxyInvocationHandler.java:156)
>  at com.sun.proxy.$Proxy85.getGcpTempLocation(Unknown Source)
>  at 
> org.apache.beam.runners.dataflow.DataflowRunner.fromOptions(DataflowRunner.java:223)
>  ... 9 more
> Caused by: java.lang.ClassNotFoundException: 
> com.google.api.services.cloudresourcemanager.CloudResourceManager$Builder
>  at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>  at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>  at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>  at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>  ... 17 more
> {code}
> Maven dependency tree:
> {code:java}
> [INFO] +- 
> org.springframework.boot:spring-boot-starter-webflux:jar:2.0.0.RC1:compile
>  [INFO] | +- 
> org.springframework.boot:spring-boot-starter:jar:2.0.0.RC1:compile
>  [INFO] | | +- org.springframework.boot:spring-boot:jar:2.0.0.RC1:compile
>  [INFO] | | | \- org.springframework:spring-context:jar:5.0.3.RELEASE:compile
>  [INFO] | | | +- org.springframework:spring-aop:jar:5.0.3.RELEASE:compile
>  [INFO] | | | \- 
> org.springframework:spring-expression:jar:5.0.3.RELEASE:compile
>  [INFO] | | +- 
> org.springframework.boot:spring-boot-autoconfigure:jar:2.0.0.RC1:compile
>  [INFO] | | +- 
> org.springframework.boot:spring-boot-starter-logging:jar:2.0.0.RC1:compile
>  [INFO] | | | +- ch.qos.logback:logback-classic:jar:1.2.3:compile
>  [INFO] | | | | \- ch.qos.logback:logback-core:jar:1.2.3:compile
>  [INFO] | | | +- org.apache.logging.log4j:log4j-to-slf4j:jar:2.10.0:compile
>  [INFO] | | | | \- org.apache.logging.log4j:log4j-api:jar:2.10.0:compile
>  [INFO] | | | \- org.slf4j:jul-to-slf4j:jar:1.7.25:compile
>  [INFO] | | +- javax.annotation:javax.annotation-api:jar:1.3.1:compile
>  [INFO] | | \- org.yaml:snakeyaml:jar:1.19:runtime
>  [INFO] | +- 
> org.springframework.boot:spring-boot-starter-json:jar:2.0.0.RC1:compile
>  [INFO] | | +- 
> com.fasterxml.jackson.datatype:jackson-datatype-jdk8:jar:2.9.2:compile
>  [INFO] | | +- 
> com.fasterxml.jackson.datatype:jackson-datatype-jsr310:jar:2.9.2:compile
>  [INFO] | | 

[jira] [Assigned] (BEAM-3718) ClassNotFoundException on CloudResourceManager$Builder

2018-03-06 Thread Raghu Angadi (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raghu Angadi reassigned BEAM-3718:
--

Assignee: Chamikara Jayalath  (was: Raghu Angadi)

> ClassNotFoundException on CloudResourceManager$Builder
> --
>
> Key: BEAM-3718
> URL: https://issues.apache.org/jira/browse/BEAM-3718
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-gcp
>Reporter: Yunis Arif Said
>Assignee: Chamikara Jayalath
>Priority: Trivial
>
> In a spring boot application running google cloud dataflow code. The dataflow 
> takes data from google PubSub, transform incoming data and output result to 
> bigquery for storage. The code does not have any syntax errors. The problem 
> is when the application is run, the following exception is thrown. 
>  
> {code:java}
>  Exception in thread "main" java.lang.RuntimeException: Failed to construct 
> instance from factory method DataflowRunner#fromOptions(interface 
> org.apache.beam.sdk.options.PipelineOptions)
>  at 
> org.apache.beam.sdk.util.InstanceBuilder.buildFromMethod(InstanceBuilder.java:233)
>  at org.apache.beam.sdk.util.InstanceBuilder.build(InstanceBuilder.java:162)
>  at org.apache.beam.sdk.PipelineRunner.fromOptions(PipelineRunner.java:52)
>  at org.apache.beam.sdk.Pipeline.create(Pipeline.java:142)
>  at com.trackers.exlon.ExlonApplication.main(ExlonApplication.java:69)
>  
>  Caused by: java.lang.reflect.InvocationTargetException
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498)
>  at 
> org.apache.beam.sdk.util.InstanceBuilder.buildFromMethod(InstanceBuilder.java:222)
>  ... 4 more
> Caused by: java.lang.NoClassDefFoundError: 
> com/google/api/services/cloudresourcemanager/CloudResourceManager$Builder
>  at 
> org.apache.beam.sdk.extensions.gcp.options.GcpOptions$GcpTempLocationFactory.newCloudResourceManagerClient(GcpOptions.java:369)
>  at 
> org.apache.beam.sdk.extensions.gcp.options.GcpOptions$GcpTempLocationFactory.create(GcpOptions.java:240)
>  at 
> org.apache.beam.sdk.extensions.gcp.options.GcpOptions$GcpTempLocationFactory.create(GcpOptions.java:228)
>  at 
> org.apache.beam.sdk.options.ProxyInvocationHandler.returnDefaultHelper(ProxyInvocationHandler.java:592)
>  at 
> org.apache.beam.sdk.options.ProxyInvocationHandler.getDefault(ProxyInvocationHandler.java:533)
>  at 
> org.apache.beam.sdk.options.ProxyInvocationHandler.invoke(ProxyInvocationHandler.java:156)
>  at com.sun.proxy.$Proxy85.getGcpTempLocation(Unknown Source)
>  at 
> org.apache.beam.runners.dataflow.DataflowRunner.fromOptions(DataflowRunner.java:223)
>  ... 9 more
> Caused by: java.lang.ClassNotFoundException: 
> com.google.api.services.cloudresourcemanager.CloudResourceManager$Builder
>  at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>  at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>  at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>  at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>  ... 17 more
> {code}
> Maven dependency tree:
> {code:java}
> [INFO] +- 
> org.springframework.boot:spring-boot-starter-webflux:jar:2.0.0.RC1:compile
>  [INFO] | +- 
> org.springframework.boot:spring-boot-starter:jar:2.0.0.RC1:compile
>  [INFO] | | +- org.springframework.boot:spring-boot:jar:2.0.0.RC1:compile
>  [INFO] | | | \- org.springframework:spring-context:jar:5.0.3.RELEASE:compile
>  [INFO] | | | +- org.springframework:spring-aop:jar:5.0.3.RELEASE:compile
>  [INFO] | | | \- 
> org.springframework:spring-expression:jar:5.0.3.RELEASE:compile
>  [INFO] | | +- 
> org.springframework.boot:spring-boot-autoconfigure:jar:2.0.0.RC1:compile
>  [INFO] | | +- 
> org.springframework.boot:spring-boot-starter-logging:jar:2.0.0.RC1:compile
>  [INFO] | | | +- ch.qos.logback:logback-classic:jar:1.2.3:compile
>  [INFO] | | | | \- ch.qos.logback:logback-core:jar:1.2.3:compile
>  [INFO] | | | +- org.apache.logging.log4j:log4j-to-slf4j:jar:2.10.0:compile
>  [INFO] | | | | \- org.apache.logging.log4j:log4j-api:jar:2.10.0:compile
>  [INFO] | | | \- org.slf4j:jul-to-slf4j:jar:1.7.25:compile
>  [INFO] | | +- javax.annotation:javax.annotation-api:jar:1.3.1:compile
>  [INFO] | | \- org.yaml:snakeyaml:jar:1.19:runtime
>  [INFO] | +- 
> org.springframework.boot:spring-boot-starter-json:jar:2.0.0.RC1:compile
>  [INFO] | | +- 
> com.fasterxml.jackson.datatype:jackson-datatype-jdk8:jar:2.9.2:compile
>  [INFO] | | +- 
> com.fasterxml.jackson.datatype:jackson-datatype-jsr310:jar:2.9.2:compile
>  [INFO] | | \- 
> com.fasterxml.jackson.module:jackson-module-parameter-names:jar:2.9.2:compile
>  

[jira] [Resolved] (BEAM-3705) ApproximateUnique discards accumulated data with multiple firings.

2018-03-05 Thread Raghu Angadi (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raghu Angadi resolved BEAM-3705.

   Resolution: Fixed
Fix Version/s: 2.4.0

> ApproximateUnique discards accumulated data with multiple firings. 
> ---
>
> Key: BEAM-3705
> URL: https://issues.apache.org/jira/browse/BEAM-3705
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Reporter: Raghu Angadi
>Assignee: Raghu Angadi
>Priority: Major
> Fix For: 2.4.0
>
>  Time Spent: 6h
>  Remaining Estimate: 0h
>
> `extractOutput()` on `ApproximateUniqueCombineFn` resets the accumulated 
> value. This discards accumulated state when making subsequent firings 
> incorrect.
>  
> First reported in https://stackoverflow.com/questions/48698028/. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (BEAM-3754) KAFKA - Can't set commitOffsetsInFinalizeEnabled to false with KafkaIO.readBytes()

2018-03-05 Thread Raghu Angadi (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raghu Angadi resolved BEAM-3754.

   Resolution: Fixed
Fix Version/s: 2.4.0

> KAFKA - Can't set commitOffsetsInFinalizeEnabled to false with 
> KafkaIO.readBytes()
> --
>
> Key: BEAM-3754
> URL: https://issues.apache.org/jira/browse/BEAM-3754
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-kafka
>Affects Versions: 2.3.0
> Environment: Dataflow pipeline using Kafka as a Sink
>Reporter: Benjamin BENOIST
>Assignee: Raghu Angadi
>Priority: Minor
>  Labels: patch
> Fix For: 2.4.0
>
>   Original Estimate: 2h
>  Time Spent: 0.5h
>  Remaining Estimate: 1.5h
>
> Beam v2.3 introduces finalized offsets, in order to reduce the gaps or 
> duplicate processing of records while restarting a pipeline.
> _read()_ sets this parameter to false [by 
> default|https://github.com/apache/beam/blob/release-2.3.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L307]
>  but _readBytes()_ 
> [doesn't|https://github.com/apache/beam/blob/release-2.3.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L282],
>  thus creating an exception:
> {noformat}
> Exception in thread "main" java.lang.IllegalStateException: Missing required 
> properties: commitOffsetsInFinalizeEnabled
>      at 
> org.apache.beam.sdk.io.kafka.AutoValue_KafkaIO_Read$Builder.build(AutoValue_KafkaIO_Read.java:344)
>      at 
> org.apache.beam.sdk.io.kafka.KafkaIO.readBytes(KafkaIO.java:291){noformat}
> The parameter can be set to true with _commitOffsetsInFinalize()_ but never 
> to false.
> Using _read()_ in the definition of _readBytes()_ could prevent this kind of 
> error in the future:
> {code:java}
> public static Read readBytes() {
>   return read()
> .setKeyDeserializer(ByteArrayDeserializer.class)
> .setValueDeserializer(ByteArrayDeserializer.class)
> .build();
> }{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-3754) KAFKA - Can't set commitOffsetsInFinalizeEnabled to false with KafkaIO.readBytes()

2018-03-02 Thread Raghu Angadi (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16384330#comment-16384330
 ] 

Raghu Angadi commented on BEAM-3754:


Just saw this. You are correct. Thanks for reporting it. They should never have 
duplicated code. I am not even sure when `{{rawBytes()}}` was added.  Sent a 
fix in https://github.com/apache/beam/pull/4792

> KAFKA - Can't set commitOffsetsInFinalizeEnabled to false with 
> KafkaIO.readBytes()
> --
>
> Key: BEAM-3754
> URL: https://issues.apache.org/jira/browse/BEAM-3754
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-kafka
>Affects Versions: 2.3.0
> Environment: Dataflow pipeline using Kafka as a Sink
>Reporter: Benjamin BENOIST
>Assignee: Raghu Angadi
>Priority: Minor
>  Labels: patch
>   Original Estimate: 2h
>  Time Spent: 10m
>  Remaining Estimate: 1h 50m
>
> Beam v2.3 introduces finalized offsets, in order to reduce the gaps or 
> duplicate processing of records while restarting a pipeline.
> _read()_ sets this parameter to false [by 
> default|https://github.com/apache/beam/blob/release-2.3.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L307]
>  but _readBytes()_ 
> [doesn't|https://github.com/apache/beam/blob/release-2.3.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L282],
>  thus creating an exception:
> {noformat}
> Exception in thread "main" java.lang.IllegalStateException: Missing required 
> properties: commitOffsetsInFinalizeEnabled
>      at 
> org.apache.beam.sdk.io.kafka.AutoValue_KafkaIO_Read$Builder.build(AutoValue_KafkaIO_Read.java:344)
>      at 
> org.apache.beam.sdk.io.kafka.KafkaIO.readBytes(KafkaIO.java:291){noformat}
> The parameter can be set to true with _commitOffsetsInFinalize()_ but never 
> to false.
> Using _read()_ in the definition of _readBytes()_ could prevent this kind of 
> error in the future:
> {code:java}
> public static Read readBytes() {
>   return read()
> .setKeyDeserializer(ByteArrayDeserializer.class)
> .setValueDeserializer(ByteArrayDeserializer.class)
> .build();
> }{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (BEAM-3770) The problem of kafkaIO sdk for data latency

2018-03-02 Thread Raghu Angadi (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3770?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raghu Angadi resolved BEAM-3770.

Resolution: Not A Bug

> The problem of kafkaIO sdk for data latency
> ---
>
> Key: BEAM-3770
> URL: https://issues.apache.org/jira/browse/BEAM-3770
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-kafka
>Affects Versions: 2.0.0
> Environment: For repeating my situation, my running environment is:
> OS: Ubuntn 14.04.3 LTS
> JAVA: JDK 1.7
> Beam 2.0.0 (with Direct runner)
> Kafka 2.10-0.10.1.1
> Maven 3.5.0, in which dependencies are listed in pom.xml:
> 
>   org.apache.beam
>   beam-sdks-java-core
>   2.0.0
>     
> 
>    org.apache.beam
>   beam-runners-direct-java
>   2.0.0
>   runtime
> 
> 
> org.apache.beam
>    beam-sdks-java-io-kafka
>    2.0.0   
> 
> 
> 
>    org.apache.kafka
>    kafka-clients
>    0.10.0.1
> 
>Reporter: Rick Lin
>Assignee: Raghu Angadi
>Priority: Major
> Fix For: 2.0.0
>
>
> Dear all,
>  I am using the kafkaIO sdk in my project (Beam 2.0.0 with Direct runner).
>  With using this sdk, there are a situation about *data* *latency*, and the 
> description of situation is in the following.
>  The data come from kafak with a fixed speed: 100 data size/ 1 sec.
>  I create a fixed window within 1 sec without delay. I found that the data 
> size is 70, 80, 104, or greater than or equal to 104.
>  After one day, the data latency happens in my running time, and the data 
> size will be only 10 in each window.
>  *In order to clearly explain it, I also provide my code in the following.* 
> " PipelineOptions readOptions = PipelineOptionsFactory._create_();
> *final* Pipeline p = Pipeline._create_(readOptions);
>  PCollection>> readData =
>   p.apply(KafkaIO._read_()   
>  .withBootstrapServers("127.0.0.1:9092")
>  .withTopic("kafkasink")
>  .withKeyDeserializer(StringDeserializer.*class*)
>  .withValueDeserializer(StringDeserializer.*class*)
>  .withoutMetadata())
>  .apply(ParDo._of_(*new* +DoFn, 
> TimestampedValue>>()+ {
>     @ProcessElement
>     *public* *void* test(ProcessContext c) *throws* ParseException {
>     String element = c.element().getValue();
>     *try* {
>   JsonNode arrNode = *new* ObjectMapper().readTree(element);
>   String t = arrNode.path("v").findValue("Timestamp").textValue();
>   DateTimeFormatter formatter = 
> DateTimeFormatter._ofPattern_("MM/dd/ HH:mm:ss.");
>      LocalDateTime dateTime = LocalDateTime._parse_(t, formatter);
>      java.time.Instant java_instant = 
> dateTime.atZone(ZoneId._systemDefault_()).toInstant();
>      Instant timestamp  = *new* Instant(java_instant.toEpochMilli());
>   c.output(TimestampedValue._of_(c.element(), timestamp));
>     } *catch* (JsonGenerationException e) {
>     e.printStackTrace();
>     } *catch* (JsonMappingException e) {
>     e.printStackTrace();
>   } *catch* (IOException e) {
>     e.printStackTrace();
>   }
>     }}));
>  PCollection>> readDivideData = 
> readData.apply(
>   Window.>> 
> _into_(FixedWindows._of_(Duration._standardSeconds_(1))
>   .withOffset(Duration.*_ZERO_*))
>   .triggering(AfterWatermark._pastEndOfWindow_()   
>  .withLateFirings(AfterProcessingTime._pastFirstElementInPane_()
>    .plusDelayOf(Duration.*_ZERO_*)))
>   .withAllowedLateness(Duration.*_ZERO_*)
>   .discardingFiredPanes());"
>  *In addition, the running result is as shown in the following.*
> "data-size=104
> coming-data-time=2018-02-27 02:00:49.117
> window-time=2018-02-27 02:00:49.999
>  data-size=78
> coming-data-time=2018-02-27 02:00:50.318
> window-time=2018-02-27 02:00:50.999
>  data-size=104
> coming-data-time=2018-02-27 02:00:51.102
> window-time=2018-02-27 02:00:51.999
>  After one day:
> data-size=10
> coming-data-time=2018-02-28 02:05:48.217
> window-time=2018-03-01 10:35:16.999 "
> If you have any idea about the problem (data latency), I am looking forward 
> to hearing from you.
> Thanks
> Rick



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-3770) The problem of kafkaIO sdk for data latency

2018-03-02 Thread Raghu Angadi (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16384259#comment-16384259
 ] 

Raghu Angadi commented on BEAM-3770:


This is being discussed on user mailing list : 
[https://lists.apache.org/thread.html/db923fbb55469287a68742bf8276e0eb18db923632dd94468235a177@%3Cuser.beam.apache.org%3E]

 

Please reopen this is bug if that discussion leads us to believe this is a 
KafkaIO issue. Please include the entire pipeline code if feasible, which makes 
it simpler to reproduce. Closing it now. There is no indication this is a 
KafkaIO issue yet.

> The problem of kafkaIO sdk for data latency
> ---
>
> Key: BEAM-3770
> URL: https://issues.apache.org/jira/browse/BEAM-3770
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-kafka
>Affects Versions: 2.0.0
> Environment: For repeating my situation, my running environment is:
> OS: Ubuntn 14.04.3 LTS
> JAVA: JDK 1.7
> Beam 2.0.0 (with Direct runner)
> Kafka 2.10-0.10.1.1
> Maven 3.5.0, in which dependencies are listed in pom.xml:
> 
>   org.apache.beam
>   beam-sdks-java-core
>   2.0.0
>     
> 
>    org.apache.beam
>   beam-runners-direct-java
>   2.0.0
>   runtime
> 
> 
> org.apache.beam
>    beam-sdks-java-io-kafka
>    2.0.0   
> 
> 
> 
>    org.apache.kafka
>    kafka-clients
>    0.10.0.1
> 
>Reporter: Rick Lin
>Assignee: Raghu Angadi
>Priority: Major
> Fix For: 2.0.0
>
>
> Dear all,
>  I am using the kafkaIO sdk in my project (Beam 2.0.0 with Direct runner).
>  With using this sdk, there are a situation about *data* *latency*, and the 
> description of situation is in the following.
>  The data come from kafak with a fixed speed: 100 data size/ 1 sec.
>  I create a fixed window within 1 sec without delay. I found that the data 
> size is 70, 80, 104, or greater than or equal to 104.
>  After one day, the data latency happens in my running time, and the data 
> size will be only 10 in each window.
>  *In order to clearly explain it, I also provide my code in the following.* 
> " PipelineOptions readOptions = PipelineOptionsFactory._create_();
> *final* Pipeline p = Pipeline._create_(readOptions);
>  PCollection>> readData =
>   p.apply(KafkaIO._read_()   
>  .withBootstrapServers("127.0.0.1:9092")
>  .withTopic("kafkasink")
>  .withKeyDeserializer(StringDeserializer.*class*)
>  .withValueDeserializer(StringDeserializer.*class*)
>  .withoutMetadata())
>  .apply(ParDo._of_(*new* +DoFn, 
> TimestampedValue>>()+ {
>     @ProcessElement
>     *public* *void* test(ProcessContext c) *throws* ParseException {
>     String element = c.element().getValue();
>     *try* {
>   JsonNode arrNode = *new* ObjectMapper().readTree(element);
>   String t = arrNode.path("v").findValue("Timestamp").textValue();
>   DateTimeFormatter formatter = 
> DateTimeFormatter._ofPattern_("MM/dd/ HH:mm:ss.");
>      LocalDateTime dateTime = LocalDateTime._parse_(t, formatter);
>      java.time.Instant java_instant = 
> dateTime.atZone(ZoneId._systemDefault_()).toInstant();
>      Instant timestamp  = *new* Instant(java_instant.toEpochMilli());
>   c.output(TimestampedValue._of_(c.element(), timestamp));
>     } *catch* (JsonGenerationException e) {
>     e.printStackTrace();
>     } *catch* (JsonMappingException e) {
>     e.printStackTrace();
>   } *catch* (IOException e) {
>     e.printStackTrace();
>   }
>     }}));
>  PCollection>> readDivideData = 
> readData.apply(
>   Window.>> 
> _into_(FixedWindows._of_(Duration._standardSeconds_(1))
>   .withOffset(Duration.*_ZERO_*))
>   .triggering(AfterWatermark._pastEndOfWindow_()   
>  .withLateFirings(AfterProcessingTime._pastFirstElementInPane_()
>    .plusDelayOf(Duration.*_ZERO_*)))
>   .withAllowedLateness(Duration.*_ZERO_*)
>   .discardingFiredPanes());"
>  *In addition, the running result is as shown in the following.*
> "data-size=104
> coming-data-time=2018-02-27 02:00:49.117
> window-time=2018-02-27 02:00:49.999
>  data-size=78
> coming-data-time=2018-02-27 02:00:50.318
> window-time=2018-02-27 02:00:50.999
>  data-size=104
> coming-data-time=2018-02-27 02:00:51.102
> window-time=2018-02-27 02:00:51.999
>  After one day:
> data-size=10
> coming-data-time=2018-02-28 02:05:48.217
> window-time=2018-03-01 10:35:16.999 "
> If you have any idea about the problem (data latency), I am looking forward 
> to hearing from 

[jira] [Created] (BEAM-3705) ApproximateUnique discards accumulated data with multiple firings.

2018-02-14 Thread Raghu Angadi (JIRA)
Raghu Angadi created BEAM-3705:
--

 Summary: ApproximateUnique discards accumulated data with multiple 
firings. 
 Key: BEAM-3705
 URL: https://issues.apache.org/jira/browse/BEAM-3705
 Project: Beam
  Issue Type: Bug
  Components: sdk-java-core
Reporter: Raghu Angadi
Assignee: Raghu Angadi


`extractOutput()` on `ApproximateUniqueCombineFn` resets the accumulated value. 
This discards accumulated state when making subsequent firings incorrect.

 

First reported in https://stackoverflow.com/questions/48698028/. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (BEAM-3689) Direct runner leak a reader for every 10 input records

2018-02-11 Thread Raghu Angadi (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raghu Angadi updated BEAM-3689:
---
Fix Version/s: 2.4.0

> Direct runner leak a reader for every 10 input records
> --
>
> Key: BEAM-3689
> URL: https://issues.apache.org/jira/browse/BEAM-3689
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-direct
>Reporter: Raghu Angadi
>Assignee: Raghu Angadi
>Priority: Major
> Fix For: 2.4.0
>
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> Direct runner reads 10 records at a time from a reader. I think the intention 
> is to reuse the reader, but it reuses only if the reader is idle initially, 
> not when the source has messages available.
> When I was testing KafkaIO with direct runner it kept opening new reader for 
> every 10 records and soon ran out of file descriptors.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-3689) Direct runner leak a reader for every 10 input records

2018-02-11 Thread Raghu Angadi (JIRA)
Raghu Angadi created BEAM-3689:
--

 Summary: Direct runner leak a reader for every 10 input records
 Key: BEAM-3689
 URL: https://issues.apache.org/jira/browse/BEAM-3689
 Project: Beam
  Issue Type: Improvement
  Components: runner-direct
Reporter: Raghu Angadi
Assignee: Thomas Groh


Direct runner reads 10 records at a time from a reader. I think the intention 
is to reuse the reader, but it reuses only if the reader is idle initially, not 
when the source has messages available.

When I was testing KafkaIO with direct runner it kept opening new reader for 
every 10 records and soon ran out of file descriptors.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-3620) Deprecate older Kafka clients

2018-02-05 Thread Raghu Angadi (JIRA)
Raghu Angadi created BEAM-3620:
--

 Summary: Deprecate older Kafka clients
 Key: BEAM-3620
 URL: https://issues.apache.org/jira/browse/BEAM-3620
 Project: Beam
  Issue Type: Improvement
  Components: sdk-java-extensions
Affects Versions: 2.3.0
Reporter: Raghu Angadi
Assignee: Raghu Angadi
 Fix For: 2.4.0


Kafka versions 0.10.1 and above have better support for timestamps. We intend 
to deprecate older versions as a first step towards dropping the support 
entirely.

The older versions (0.9.x and 0.10.0) at run time still work, but KafkaIO 
itself does not support these at build time. It lets us remove some of the 
reflection based code used to support older versions at build time. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (BEAM-3526) Support for checkpointMark finalize in KafkaIO

2018-02-05 Thread Raghu Angadi (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raghu Angadi updated BEAM-3526:
---
Fix Version/s: 2.4.0

> Support for checkpointMark finalize in KafkaIO
> --
>
> Key: BEAM-3526
> URL: https://issues.apache.org/jira/browse/BEAM-3526
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-extensions
>Affects Versions: 2.2.0
>Reporter: Raghu Angadi
>Assignee: Raghu Angadi
>Priority: Major
> Fix For: 2.4.0
>
>
> Users have been asking for option to commit offsets back to Kafka inside 
> `CheckpointMark.finalizeCheckpoint()'. This option is some what better than 
> setting 'AUTO_COMMIT' in Kafka consumer. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (BEAM-3526) Support for checkpointMark finalize in KafkaIO

2018-02-05 Thread Raghu Angadi (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raghu Angadi closed BEAM-3526.
--
Resolution: Fixed

> Support for checkpointMark finalize in KafkaIO
> --
>
> Key: BEAM-3526
> URL: https://issues.apache.org/jira/browse/BEAM-3526
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-extensions
>Affects Versions: 2.2.0
>Reporter: Raghu Angadi
>Assignee: Raghu Angadi
>Priority: Major
> Fix For: 2.4.0
>
>
> Users have been asking for option to commit offsets back to Kafka inside 
> `CheckpointMark.finalizeCheckpoint()'. This option is some what better than 
> setting 'AUTO_COMMIT' in Kafka consumer. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-3611) Split KafkaIO.java into smaller files

2018-02-02 Thread Raghu Angadi (JIRA)
Raghu Angadi created BEAM-3611:
--

 Summary: Split KafkaIO.java into smaller files
 Key: BEAM-3611
 URL: https://issues.apache.org/jira/browse/BEAM-3611
 Project: Beam
  Issue Type: Improvement
  Components: sdk-java-extensions
Reporter: Raghu Angadi
Assignee: Reuven Lax
 Fix For: 2.4.0


KafkaIO.java has grown too big and includes both source and sink 
implementation. Better to move these to own files. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-3611) Split KafkaIO.java into smaller files

2018-02-02 Thread Raghu Angadi (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3611?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raghu Angadi reassigned BEAM-3611:
--

Assignee: Raghu Angadi  (was: Reuven Lax)

> Split KafkaIO.java into smaller files
> -
>
> Key: BEAM-3611
> URL: https://issues.apache.org/jira/browse/BEAM-3611
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-extensions
>Reporter: Raghu Angadi
>Assignee: Raghu Angadi
>Priority: Minor
> Fix For: 2.4.0
>
>
> KafkaIO.java has grown too big and includes both source and sink 
> implementation. Better to move these to own files. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (BEAM-3526) Support for checkpointMark finalize in KafkaIO

2018-01-24 Thread Raghu Angadi (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raghu Angadi updated BEAM-3526:
---
Affects Version/s: 2.2.0

> Support for checkpointMark finalize in KafkaIO
> --
>
> Key: BEAM-3526
> URL: https://issues.apache.org/jira/browse/BEAM-3526
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-extensions
>Affects Versions: 2.2.0
>Reporter: Raghu Angadi
>Assignee: Raghu Angadi
>Priority: Major
>
> Users have been asking for option to commit offsets back to Kafka inside 
> `CheckpointMark.finalizeCheckpoint()'. This option is some what better than 
> setting 'AUTO_COMMIT' in Kafka consumer. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-3526) Support for checkpointMark finalize in KafkaIO

2018-01-24 Thread Raghu Angadi (JIRA)
Raghu Angadi created BEAM-3526:
--

 Summary: Support for checkpointMark finalize in KafkaIO
 Key: BEAM-3526
 URL: https://issues.apache.org/jira/browse/BEAM-3526
 Project: Beam
  Issue Type: Bug
  Components: sdk-java-extensions
Reporter: Raghu Angadi
Assignee: Reuven Lax


Users have been asking for option to commit offsets back to Kafka inside 
`CheckpointMark.finalizeCheckpoint()'. This option is some what better than 
setting 'AUTO_COMMIT' in Kafka consumer. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-3526) Support for checkpointMark finalize in KafkaIO

2018-01-24 Thread Raghu Angadi (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raghu Angadi reassigned BEAM-3526:
--

Assignee: Raghu Angadi  (was: Reuven Lax)

> Support for checkpointMark finalize in KafkaIO
> --
>
> Key: BEAM-3526
> URL: https://issues.apache.org/jira/browse/BEAM-3526
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-extensions
>Affects Versions: 2.2.0
>Reporter: Raghu Angadi
>Assignee: Raghu Angadi
>Priority: Major
>
> Users have been asking for option to commit offsets back to Kafka inside 
> `CheckpointMark.finalizeCheckpoint()'. This option is some what better than 
> setting 'AUTO_COMMIT' in Kafka consumer. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (BEAM-2257) KafkaIO write without key requires a producer fn

2018-01-24 Thread Raghu Angadi (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raghu Angadi closed BEAM-2257.
--
Resolution: Fixed

> KafkaIO write without key requires a producer fn
> 
>
> Key: BEAM-2257
> URL: https://issues.apache.org/jira/browse/BEAM-2257
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-extensions
>Reporter: Jean-Baptiste Onofré
>Assignee: Jean-Baptiste Onofré
>Priority: Major
> Fix For: 2.3.0
>
>
> The {{KafkaIO}} javadoc says that it's possible to write directly {{String}} 
> to the topic without key:
> {code}
>PCollection strings = ...;
>strings.apply(KafkaIO.write()
>.withBootstrapServers("broker_1:9092,broker_2:9092")
>.withTopic("results")
>.withValueSerializer(new StringSerializer()) // just need serializer 
> for value
>.values()
>  );
> {code}
> This is not fully correct:
> 1. {{withValueSerializer()}} requires a class of serializer, not an instance. 
> So, it should be {{withValueSerializer(StringSerializer.class)}}.
> 2. As the key serializer is not provider, a kafka producer fn is required, 
> else, the user will get:
> {code}
> Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka 
> producer
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:321)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:156)
>   at 
> org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter.setup(KafkaIO.java:1494)
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:300)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:156)
>   at 
> org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter.setup(KafkaIO.java:1494)
> {code}
> A possible workaround is to create a {{VoidSerializer}} and pass it via 
> {{withKeySerializer()}} or provide the producer fn.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (BEAM-2704) KafkaIO: NPE without key serializer set

2018-01-24 Thread Raghu Angadi (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raghu Angadi closed BEAM-2704.
--
   Resolution: Fixed
Fix Version/s: 2.3.0

> KafkaIO: NPE without key serializer set
> ---
>
> Key: BEAM-2704
> URL: https://issues.apache.org/jira/browse/BEAM-2704
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-extensions
>Reporter: Chris Pettitt
>Assignee: Raghu Angadi
>Priority: Major
> Fix For: 2.3.0
>
>
> The KafkaIO javadoc implies that you do not need to set a Serializer if you 
> only want to emit values:
> {code}
>  * Often you might want to write just values without any keys to Kafka. 
> Use {@code values()} to
>  * write records with default empty(null) key:
>  *
>  * {@code
>  *  PCollection strings = ...;
>  *  strings.apply(KafkaIO.write()
>  *  .withBootstrapServers("broker_1:9092,broker_2:9092")
>  *  .withTopic("results")
>  *  .withValueSerializer(new StringSerializer()) // just need serializer 
> for value
>  *  .values()
>  *);
>  * }
> {code}
> However, if you don't set the key serializer then Kafka blows up when trying 
> to instantiate the key serializer (in Kafka 0.10.1, at least). It would be 
> more convenient if KafkaIO worked as documented and assigned a null 
> serializer if values() is used.  
> Relevant stack trace:
> {code}
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:230)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:163)
>   at 
> org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter.setup(KafkaIO.java:1582)
>   at 
> org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter$DoFnInvoker.invokeSetup(Unknown
>  Source)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-2704) KafkaIO: NPE without key serializer set

2018-01-24 Thread Raghu Angadi (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16338195#comment-16338195
 ] 

Raghu Angadi commented on BEAM-2704:


Resolving this as  BEAM-2257 is fixed (it still needs to be marked resolved).

> KafkaIO: NPE without key serializer set
> ---
>
> Key: BEAM-2704
> URL: https://issues.apache.org/jira/browse/BEAM-2704
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-extensions
>Reporter: Chris Pettitt
>Assignee: Raghu Angadi
>Priority: Major
>
> The KafkaIO javadoc implies that you do not need to set a Serializer if you 
> only want to emit values:
> {code}
>  * Often you might want to write just values without any keys to Kafka. 
> Use {@code values()} to
>  * write records with default empty(null) key:
>  *
>  * {@code
>  *  PCollection strings = ...;
>  *  strings.apply(KafkaIO.write()
>  *  .withBootstrapServers("broker_1:9092,broker_2:9092")
>  *  .withTopic("results")
>  *  .withValueSerializer(new StringSerializer()) // just need serializer 
> for value
>  *  .values()
>  *);
>  * }
> {code}
> However, if you don't set the key serializer then Kafka blows up when trying 
> to instantiate the key serializer (in Kafka 0.10.1, at least). It would be 
> more convenient if KafkaIO worked as documented and assigned a null 
> serializer if values() is used.  
> Relevant stack trace:
> {code}
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:230)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:163)
>   at 
> org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter.setup(KafkaIO.java:1582)
>   at 
> org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter$DoFnInvoker.invokeSetup(Unknown
>  Source)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (BEAM-3093) add an option 'FirstPollOffsetStrategy' to KafkaIO

2017-11-27 Thread Raghu Angadi (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16268168#comment-16268168
 ] 

Raghu Angadi edited comment on BEAM-3093 at 11/28/17 5:52 AM:
--

[~mingmxu], assigning this to you. Let me know {{withStartReadTime()}} does not 
do what you are looking for.


was (Author: rangadi):
[~mingmxu], assigning this to you. Let me know `withStartReadTime()` does not 
do what you are looking for.

> add an option 'FirstPollOffsetStrategy' to KafkaIO
> --
>
> Key: BEAM-3093
> URL: https://issues.apache.org/jira/browse/BEAM-3093
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Xu Mingmin
>Assignee: Xu Mingmin
>
> This is a feature borrowed from Storm KafkaSpout.
> *What's the issue?*
> In KafkaIO, when offset is stored either in checkpoint or auto_committed, it 
> cannot be changed in application, to force to read from earliest/latest. 
> --This feature is important to reset the start offset when relaunching a job.
> *Proposed solution:*
> By borrowing the FirstPollOffsetStrategy concept, users can have more options:
> 1). *{{EARLIEST}}*: always start_from_beginning no matter of what's in 
> checkpoint/auto_commit;
> 2). *{{LATEST}}*: always start_from_latest no matter of what's in 
> checkpoint/auto_commit;
> 3). *{{UNCOMMITTED_EARLIEST}}*: if no offset in checkpoint/auto_commit then 
> start_from_beginning if, otherwise start_from_previous_offset;
> 4). *{{UNCOMMITTED_LATEST}}*: if no offset in checkpoint/auto_commit then 
> start_from_latest, otherwise start_from_previous_offset;
> [~rangadi], any comments?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-3093) add an option 'FirstPollOffsetStrategy' to KafkaIO

2017-11-27 Thread Raghu Angadi (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16268168#comment-16268168
 ] 

Raghu Angadi commented on BEAM-3093:


[~mingmxu], assigning this to you. Let me know `withStartReadTime()` does not 
do what you are looking for.

> add an option 'FirstPollOffsetStrategy' to KafkaIO
> --
>
> Key: BEAM-3093
> URL: https://issues.apache.org/jira/browse/BEAM-3093
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Xu Mingmin
>Assignee: Xu Mingmin
>
> This is a feature borrowed from Storm KafkaSpout.
> *What's the issue?*
> In KafkaIO, when offset is stored either in checkpoint or auto_committed, it 
> cannot be changed in application, to force to read from earliest/latest. 
> --This feature is important to reset the start offset when relaunching a job.
> *Proposed solution:*
> By borrowing the FirstPollOffsetStrategy concept, users can have more options:
> 1). *{{EARLIEST}}*: always start_from_beginning no matter of what's in 
> checkpoint/auto_commit;
> 2). *{{LATEST}}*: always start_from_latest no matter of what's in 
> checkpoint/auto_commit;
> 3). *{{UNCOMMITTED_EARLIEST}}*: if no offset in checkpoint/auto_commit then 
> start_from_beginning if, otherwise start_from_previous_offset;
> 4). *{{UNCOMMITTED_LATEST}}*: if no offset in checkpoint/auto_commit then 
> start_from_latest, otherwise start_from_previous_offset;
> [~rangadi], any comments?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (BEAM-3093) add an option 'FirstPollOffsetStrategy' to KafkaIO

2017-11-27 Thread Raghu Angadi (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3093?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raghu Angadi reassigned BEAM-3093:
--

Assignee: Xu Mingmin  (was: Raghu Angadi)

> add an option 'FirstPollOffsetStrategy' to KafkaIO
> --
>
> Key: BEAM-3093
> URL: https://issues.apache.org/jira/browse/BEAM-3093
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Xu Mingmin
>Assignee: Xu Mingmin
>
> This is a feature borrowed from Storm KafkaSpout.
> *What's the issue?*
> In KafkaIO, when offset is stored either in checkpoint or auto_committed, it 
> cannot be changed in application, to force to read from earliest/latest. 
> --This feature is important to reset the start offset when relaunching a job.
> *Proposed solution:*
> By borrowing the FirstPollOffsetStrategy concept, users can have more options:
> 1). *{{EARLIEST}}*: always start_from_beginning no matter of what's in 
> checkpoint/auto_commit;
> 2). *{{LATEST}}*: always start_from_latest no matter of what's in 
> checkpoint/auto_commit;
> 3). *{{UNCOMMITTED_EARLIEST}}*: if no offset in checkpoint/auto_commit then 
> start_from_beginning if, otherwise start_from_previous_offset;
> 4). *{{UNCOMMITTED_LATEST}}*: if no offset in checkpoint/auto_commit then 
> start_from_latest, otherwise start_from_previous_offset;
> [~rangadi], any comments?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-3218) Change PubsubBoundedWriter to check total byte size

2017-11-21 Thread Raghu Angadi (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16261488#comment-16261488
 ] 

Raghu Angadi commented on BEAM-3218:


Thanks [~thang], the quota enhancements also make sense, seem equally useful 
and important. 

> Change PubsubBoundedWriter to check total byte size
> ---
>
> Key: BEAM-3218
> URL: https://issues.apache.org/jira/browse/BEAM-3218
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-gcp
>Reporter: Thang Nguyen
>Assignee: Reuven Lax
>Priority: Minor
>
> The PubsubBoundedWriter [does not 
> check|https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L895-L897]
>  the total byte size of outgoing messages when publishing. 
> AC:
> * Add a check to ensure the total bytes of the outgoing messages is less than 
> pubsub's allowed max



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2257) KafkaIO write without key requires a producer fn

2017-11-08 Thread Raghu Angadi (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16244375#comment-16244375
 ] 

Raghu Angadi commented on BEAM-2257:


Thanks [~nerdynick] for the fix. [~jbonofre] please resolve this when you get a 
chance.

> KafkaIO write without key requires a producer fn
> 
>
> Key: BEAM-2257
> URL: https://issues.apache.org/jira/browse/BEAM-2257
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-extensions
>Reporter: Jean-Baptiste Onofré
>Assignee: Jean-Baptiste Onofré
> Fix For: 2.3.0
>
>
> The {{KafkaIO}} javadoc says that it's possible to write directly {{String}} 
> to the topic without key:
> {code}
>PCollection strings = ...;
>strings.apply(KafkaIO.write()
>.withBootstrapServers("broker_1:9092,broker_2:9092")
>.withTopic("results")
>.withValueSerializer(new StringSerializer()) // just need serializer 
> for value
>.values()
>  );
> {code}
> This is not fully correct:
> 1. {{withValueSerializer()}} requires a class of serializer, not an instance. 
> So, it should be {{withValueSerializer(StringSerializer.class)}}.
> 2. As the key serializer is not provider, a kafka producer fn is required, 
> else, the user will get:
> {code}
> Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka 
> producer
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:321)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:156)
>   at 
> org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter.setup(KafkaIO.java:1494)
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:300)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:156)
>   at 
> org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter.setup(KafkaIO.java:1494)
> {code}
> A possible workaround is to create a {{VoidSerializer}} and pass it via 
> {{withKeySerializer()}} or provide the producer fn.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (BEAM-2257) KafkaIO write without key requires a producer fn

2017-11-08 Thread Raghu Angadi (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raghu Angadi updated BEAM-2257:
---
Fix Version/s: 2.3.0

> KafkaIO write without key requires a producer fn
> 
>
> Key: BEAM-2257
> URL: https://issues.apache.org/jira/browse/BEAM-2257
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-extensions
>Reporter: Jean-Baptiste Onofré
>Assignee: Jean-Baptiste Onofré
> Fix For: 2.3.0
>
>
> The {{KafkaIO}} javadoc says that it's possible to write directly {{String}} 
> to the topic without key:
> {code}
>PCollection strings = ...;
>strings.apply(KafkaIO.write()
>.withBootstrapServers("broker_1:9092,broker_2:9092")
>.withTopic("results")
>.withValueSerializer(new StringSerializer()) // just need serializer 
> for value
>.values()
>  );
> {code}
> This is not fully correct:
> 1. {{withValueSerializer()}} requires a class of serializer, not an instance. 
> So, it should be {{withValueSerializer(StringSerializer.class)}}.
> 2. As the key serializer is not provider, a kafka producer fn is required, 
> else, the user will get:
> {code}
> Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka 
> producer
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:321)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:156)
>   at 
> org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter.setup(KafkaIO.java:1494)
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:300)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:156)
>   at 
> org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter.setup(KafkaIO.java:1494)
> {code}
> A possible workaround is to create a {{VoidSerializer}} and pass it via 
> {{withKeySerializer()}} or provide the producer fn.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2979) Race condition between KafkaIO.UnboundedKafkaReader.getWatermark() and KafkaIO.UnboundedKafkaReader.advance()

2017-10-30 Thread Raghu Angadi (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16226010#comment-16226010
 ] 

Raghu Angadi commented on BEAM-2979:


It was marked a blocker by mistake. 

> Race condition between KafkaIO.UnboundedKafkaReader.getWatermark() and 
> KafkaIO.UnboundedKafkaReader.advance()
> -
>
> Key: BEAM-2979
> URL: https://issues.apache.org/jira/browse/BEAM-2979
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Affects Versions: 2.0.0, 2.1.0
>Reporter: Wesley Tanaka
>Assignee: Raghu Angadi
> Fix For: 2.3.0
>
>
> getWatermark() looks like this:
> {noformat}
> @Override
> public Instant getWatermark() {
>   if (curRecord == null) {
> LOG.debug("{}: getWatermark() : no records have been read yet.", 
> name);
> return initialWatermark;
>   }
>   return source.spec.getWatermarkFn() != null
>   ? source.spec.getWatermarkFn().apply(curRecord) : curTimestamp;
> }
> {noformat}
> advance() has code in it that looks like this:
> {noformat}
>   curRecord = null; // user coders below might throw.
>   // apply user deserializers.
>   // TODO: write records that can't be deserialized to a 
> "dead-letter" additional output.
>   KafkaRecord record = new KafkaRecord(
>   rawRecord.topic(),
>   rawRecord.partition(),
>   rawRecord.offset(),
>   consumerSpEL.getRecordTimestamp(rawRecord),
>   keyDeserializerInstance.deserialize(rawRecord.topic(), 
> rawRecord.key()),
>   valueDeserializerInstance.deserialize(rawRecord.topic(), 
> rawRecord.value()));
>   curTimestamp = (source.spec.getTimestampFn() == null)
>   ? Instant.now() : source.spec.getTimestampFn().apply(record);
>   curRecord = record;
> {noformat}
> There's a race condition between these two blocks of code which is exposed at 
> the very least in the FlinkRunner, which calls getWatermark() periodically 
> from a timer.
> The symptom of the race condition is a stack trace that looks like this (SDK 
> 2.0.0):
> {noformat}
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:910)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:853)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:853)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: TimerException{java.lang.NullPointerException}
>   at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:220)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
>   at org.apache.beam.sdk.io.kafka.KafkaIO$Read$1.apply(KafkaIO.java:568)
>   at org.apache.beam.sdk.io.kafka.KafkaIO$Read$1.apply(KafkaIO.java:565)
>   at 
> org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaReader.getWatermark(KafkaIO.java:1210)
>   at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.onProcessingTime(UnboundedSourceWrapper.java:431)
>   at 
> 

[jira] [Updated] (BEAM-2979) Race condition between KafkaIO.UnboundedKafkaReader.getWatermark() and KafkaIO.UnboundedKafkaReader.advance()

2017-10-30 Thread Raghu Angadi (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2979?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raghu Angadi updated BEAM-2979:
---
Fix Version/s: (was: 2.2.0)
   2.3.0

> Race condition between KafkaIO.UnboundedKafkaReader.getWatermark() and 
> KafkaIO.UnboundedKafkaReader.advance()
> -
>
> Key: BEAM-2979
> URL: https://issues.apache.org/jira/browse/BEAM-2979
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Affects Versions: 2.0.0, 2.1.0
>Reporter: Wesley Tanaka
>Assignee: Raghu Angadi
> Fix For: 2.3.0
>
>
> getWatermark() looks like this:
> {noformat}
> @Override
> public Instant getWatermark() {
>   if (curRecord == null) {
> LOG.debug("{}: getWatermark() : no records have been read yet.", 
> name);
> return initialWatermark;
>   }
>   return source.spec.getWatermarkFn() != null
>   ? source.spec.getWatermarkFn().apply(curRecord) : curTimestamp;
> }
> {noformat}
> advance() has code in it that looks like this:
> {noformat}
>   curRecord = null; // user coders below might throw.
>   // apply user deserializers.
>   // TODO: write records that can't be deserialized to a 
> "dead-letter" additional output.
>   KafkaRecord record = new KafkaRecord(
>   rawRecord.topic(),
>   rawRecord.partition(),
>   rawRecord.offset(),
>   consumerSpEL.getRecordTimestamp(rawRecord),
>   keyDeserializerInstance.deserialize(rawRecord.topic(), 
> rawRecord.key()),
>   valueDeserializerInstance.deserialize(rawRecord.topic(), 
> rawRecord.value()));
>   curTimestamp = (source.spec.getTimestampFn() == null)
>   ? Instant.now() : source.spec.getTimestampFn().apply(record);
>   curRecord = record;
> {noformat}
> There's a race condition between these two blocks of code which is exposed at 
> the very least in the FlinkRunner, which calls getWatermark() periodically 
> from a timer.
> The symptom of the race condition is a stack trace that looks like this (SDK 
> 2.0.0):
> {noformat}
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:910)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:853)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:853)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: TimerException{java.lang.NullPointerException}
>   at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:220)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
>   at org.apache.beam.sdk.io.kafka.KafkaIO$Read$1.apply(KafkaIO.java:568)
>   at org.apache.beam.sdk.io.kafka.KafkaIO$Read$1.apply(KafkaIO.java:565)
>   at 
> org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaReader.getWatermark(KafkaIO.java:1210)
>   at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.onProcessingTime(UnboundedSourceWrapper.java:431)
>   at 
> 

[jira] [Updated] (BEAM-2979) Race condition between KafkaIO.UnboundedKafkaReader.getWatermark() and KafkaIO.UnboundedKafkaReader.advance()

2017-10-30 Thread Raghu Angadi (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2979?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raghu Angadi updated BEAM-2979:
---
Priority: Major  (was: Blocker)

> Race condition between KafkaIO.UnboundedKafkaReader.getWatermark() and 
> KafkaIO.UnboundedKafkaReader.advance()
> -
>
> Key: BEAM-2979
> URL: https://issues.apache.org/jira/browse/BEAM-2979
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Affects Versions: 2.0.0, 2.1.0
>Reporter: Wesley Tanaka
>Assignee: Raghu Angadi
> Fix For: 2.3.0
>
>
> getWatermark() looks like this:
> {noformat}
> @Override
> public Instant getWatermark() {
>   if (curRecord == null) {
> LOG.debug("{}: getWatermark() : no records have been read yet.", 
> name);
> return initialWatermark;
>   }
>   return source.spec.getWatermarkFn() != null
>   ? source.spec.getWatermarkFn().apply(curRecord) : curTimestamp;
> }
> {noformat}
> advance() has code in it that looks like this:
> {noformat}
>   curRecord = null; // user coders below might throw.
>   // apply user deserializers.
>   // TODO: write records that can't be deserialized to a 
> "dead-letter" additional output.
>   KafkaRecord record = new KafkaRecord(
>   rawRecord.topic(),
>   rawRecord.partition(),
>   rawRecord.offset(),
>   consumerSpEL.getRecordTimestamp(rawRecord),
>   keyDeserializerInstance.deserialize(rawRecord.topic(), 
> rawRecord.key()),
>   valueDeserializerInstance.deserialize(rawRecord.topic(), 
> rawRecord.value()));
>   curTimestamp = (source.spec.getTimestampFn() == null)
>   ? Instant.now() : source.spec.getTimestampFn().apply(record);
>   curRecord = record;
> {noformat}
> There's a race condition between these two blocks of code which is exposed at 
> the very least in the FlinkRunner, which calls getWatermark() periodically 
> from a timer.
> The symptom of the race condition is a stack trace that looks like this (SDK 
> 2.0.0):
> {noformat}
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:910)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:853)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:853)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: TimerException{java.lang.NullPointerException}
>   at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:220)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
>   at org.apache.beam.sdk.io.kafka.KafkaIO$Read$1.apply(KafkaIO.java:568)
>   at org.apache.beam.sdk.io.kafka.KafkaIO$Read$1.apply(KafkaIO.java:565)
>   at 
> org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaReader.getWatermark(KafkaIO.java:1210)
>   at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.onProcessingTime(UnboundedSourceWrapper.java:431)
>   at 
> 

[jira] [Commented] (BEAM-2979) Race condition between KafkaIO.UnboundedKafkaReader.getWatermark() and KafkaIO.UnboundedKafkaReader.advance()

2017-10-30 Thread Raghu Angadi (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16225831#comment-16225831
 ] 

Raghu Angadi commented on BEAM-2979:


I think it is good to have. There is no work around for Flink users that set 
watermark function.

> Race condition between KafkaIO.UnboundedKafkaReader.getWatermark() and 
> KafkaIO.UnboundedKafkaReader.advance()
> -
>
> Key: BEAM-2979
> URL: https://issues.apache.org/jira/browse/BEAM-2979
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Affects Versions: 2.0.0, 2.1.0
>Reporter: Wesley Tanaka
>Assignee: Raghu Angadi
>Priority: Blocker
> Fix For: 2.2.0
>
>
> getWatermark() looks like this:
> {noformat}
> @Override
> public Instant getWatermark() {
>   if (curRecord == null) {
> LOG.debug("{}: getWatermark() : no records have been read yet.", 
> name);
> return initialWatermark;
>   }
>   return source.spec.getWatermarkFn() != null
>   ? source.spec.getWatermarkFn().apply(curRecord) : curTimestamp;
> }
> {noformat}
> advance() has code in it that looks like this:
> {noformat}
>   curRecord = null; // user coders below might throw.
>   // apply user deserializers.
>   // TODO: write records that can't be deserialized to a 
> "dead-letter" additional output.
>   KafkaRecord record = new KafkaRecord(
>   rawRecord.topic(),
>   rawRecord.partition(),
>   rawRecord.offset(),
>   consumerSpEL.getRecordTimestamp(rawRecord),
>   keyDeserializerInstance.deserialize(rawRecord.topic(), 
> rawRecord.key()),
>   valueDeserializerInstance.deserialize(rawRecord.topic(), 
> rawRecord.value()));
>   curTimestamp = (source.spec.getTimestampFn() == null)
>   ? Instant.now() : source.spec.getTimestampFn().apply(record);
>   curRecord = record;
> {noformat}
> There's a race condition between these two blocks of code which is exposed at 
> the very least in the FlinkRunner, which calls getWatermark() periodically 
> from a timer.
> The symptom of the race condition is a stack trace that looks like this (SDK 
> 2.0.0):
> {noformat}
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:910)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:853)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:853)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: TimerException{java.lang.NullPointerException}
>   at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:220)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
>   at org.apache.beam.sdk.io.kafka.KafkaIO$Read$1.apply(KafkaIO.java:568)
>   at org.apache.beam.sdk.io.kafka.KafkaIO$Read$1.apply(KafkaIO.java:565)
>   at 
> org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaReader.getWatermark(KafkaIO.java:1210)
>   at 
> 

[jira] [Updated] (BEAM-2979) Race condition between KafkaIO.UnboundedKafkaReader.getWatermark() and KafkaIO.UnboundedKafkaReader.advance()

2017-10-30 Thread Raghu Angadi (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2979?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raghu Angadi updated BEAM-2979:
---
Fix Version/s: 2.2.0

> Race condition between KafkaIO.UnboundedKafkaReader.getWatermark() and 
> KafkaIO.UnboundedKafkaReader.advance()
> -
>
> Key: BEAM-2979
> URL: https://issues.apache.org/jira/browse/BEAM-2979
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Affects Versions: 2.0.0, 2.1.0
>Reporter: Wesley Tanaka
>Assignee: Raghu Angadi
>Priority: Blocker
> Fix For: 2.2.0
>
>
> getWatermark() looks like this:
> {noformat}
> @Override
> public Instant getWatermark() {
>   if (curRecord == null) {
> LOG.debug("{}: getWatermark() : no records have been read yet.", 
> name);
> return initialWatermark;
>   }
>   return source.spec.getWatermarkFn() != null
>   ? source.spec.getWatermarkFn().apply(curRecord) : curTimestamp;
> }
> {noformat}
> advance() has code in it that looks like this:
> {noformat}
>   curRecord = null; // user coders below might throw.
>   // apply user deserializers.
>   // TODO: write records that can't be deserialized to a 
> "dead-letter" additional output.
>   KafkaRecord record = new KafkaRecord(
>   rawRecord.topic(),
>   rawRecord.partition(),
>   rawRecord.offset(),
>   consumerSpEL.getRecordTimestamp(rawRecord),
>   keyDeserializerInstance.deserialize(rawRecord.topic(), 
> rawRecord.key()),
>   valueDeserializerInstance.deserialize(rawRecord.topic(), 
> rawRecord.value()));
>   curTimestamp = (source.spec.getTimestampFn() == null)
>   ? Instant.now() : source.spec.getTimestampFn().apply(record);
>   curRecord = record;
> {noformat}
> There's a race condition between these two blocks of code which is exposed at 
> the very least in the FlinkRunner, which calls getWatermark() periodically 
> from a timer.
> The symptom of the race condition is a stack trace that looks like this (SDK 
> 2.0.0):
> {noformat}
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:910)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:853)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:853)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: TimerException{java.lang.NullPointerException}
>   at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:220)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
>   at org.apache.beam.sdk.io.kafka.KafkaIO$Read$1.apply(KafkaIO.java:568)
>   at org.apache.beam.sdk.io.kafka.KafkaIO$Read$1.apply(KafkaIO.java:565)
>   at 
> org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaReader.getWatermark(KafkaIO.java:1210)
>   at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.onProcessingTime(UnboundedSourceWrapper.java:431)
>   at 
> 

[jira] [Commented] (BEAM-2979) Race condition between KafkaIO.UnboundedKafkaReader.getWatermark() and KafkaIO.UnboundedKafkaReader.advance()

2017-10-30 Thread Raghu Angadi (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16225817#comment-16225817
 ] 

Raghu Angadi commented on BEAM-2979:


No, Cham is waiting on Jenkings before merging. I cleared `Fix Version`. 

> Race condition between KafkaIO.UnboundedKafkaReader.getWatermark() and 
> KafkaIO.UnboundedKafkaReader.advance()
> -
>
> Key: BEAM-2979
> URL: https://issues.apache.org/jira/browse/BEAM-2979
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Affects Versions: 2.0.0, 2.1.0
>Reporter: Wesley Tanaka
>Assignee: Raghu Angadi
>Priority: Blocker
>
> getWatermark() looks like this:
> {noformat}
> @Override
> public Instant getWatermark() {
>   if (curRecord == null) {
> LOG.debug("{}: getWatermark() : no records have been read yet.", 
> name);
> return initialWatermark;
>   }
>   return source.spec.getWatermarkFn() != null
>   ? source.spec.getWatermarkFn().apply(curRecord) : curTimestamp;
> }
> {noformat}
> advance() has code in it that looks like this:
> {noformat}
>   curRecord = null; // user coders below might throw.
>   // apply user deserializers.
>   // TODO: write records that can't be deserialized to a 
> "dead-letter" additional output.
>   KafkaRecord record = new KafkaRecord(
>   rawRecord.topic(),
>   rawRecord.partition(),
>   rawRecord.offset(),
>   consumerSpEL.getRecordTimestamp(rawRecord),
>   keyDeserializerInstance.deserialize(rawRecord.topic(), 
> rawRecord.key()),
>   valueDeserializerInstance.deserialize(rawRecord.topic(), 
> rawRecord.value()));
>   curTimestamp = (source.spec.getTimestampFn() == null)
>   ? Instant.now() : source.spec.getTimestampFn().apply(record);
>   curRecord = record;
> {noformat}
> There's a race condition between these two blocks of code which is exposed at 
> the very least in the FlinkRunner, which calls getWatermark() periodically 
> from a timer.
> The symptom of the race condition is a stack trace that looks like this (SDK 
> 2.0.0):
> {noformat}
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:910)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:853)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:853)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: TimerException{java.lang.NullPointerException}
>   at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:220)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
>   at org.apache.beam.sdk.io.kafka.KafkaIO$Read$1.apply(KafkaIO.java:568)
>   at org.apache.beam.sdk.io.kafka.KafkaIO$Read$1.apply(KafkaIO.java:565)
>   at 
> org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaReader.getWatermark(KafkaIO.java:1210)
>   at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.onProcessingTime(UnboundedSourceWrapper.java:431)
>   at 

[jira] [Updated] (BEAM-2979) Race condition between KafkaIO.UnboundedKafkaReader.getWatermark() and KafkaIO.UnboundedKafkaReader.advance()

2017-10-30 Thread Raghu Angadi (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2979?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raghu Angadi updated BEAM-2979:
---
Fix Version/s: (was: 2.2.0)

> Race condition between KafkaIO.UnboundedKafkaReader.getWatermark() and 
> KafkaIO.UnboundedKafkaReader.advance()
> -
>
> Key: BEAM-2979
> URL: https://issues.apache.org/jira/browse/BEAM-2979
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Affects Versions: 2.0.0, 2.1.0
>Reporter: Wesley Tanaka
>Assignee: Raghu Angadi
>Priority: Blocker
>
> getWatermark() looks like this:
> {noformat}
> @Override
> public Instant getWatermark() {
>   if (curRecord == null) {
> LOG.debug("{}: getWatermark() : no records have been read yet.", 
> name);
> return initialWatermark;
>   }
>   return source.spec.getWatermarkFn() != null
>   ? source.spec.getWatermarkFn().apply(curRecord) : curTimestamp;
> }
> {noformat}
> advance() has code in it that looks like this:
> {noformat}
>   curRecord = null; // user coders below might throw.
>   // apply user deserializers.
>   // TODO: write records that can't be deserialized to a 
> "dead-letter" additional output.
>   KafkaRecord record = new KafkaRecord(
>   rawRecord.topic(),
>   rawRecord.partition(),
>   rawRecord.offset(),
>   consumerSpEL.getRecordTimestamp(rawRecord),
>   keyDeserializerInstance.deserialize(rawRecord.topic(), 
> rawRecord.key()),
>   valueDeserializerInstance.deserialize(rawRecord.topic(), 
> rawRecord.value()));
>   curTimestamp = (source.spec.getTimestampFn() == null)
>   ? Instant.now() : source.spec.getTimestampFn().apply(record);
>   curRecord = record;
> {noformat}
> There's a race condition between these two blocks of code which is exposed at 
> the very least in the FlinkRunner, which calls getWatermark() periodically 
> from a timer.
> The symptom of the race condition is a stack trace that looks like this (SDK 
> 2.0.0):
> {noformat}
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:910)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:853)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:853)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: TimerException{java.lang.NullPointerException}
>   at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:220)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
>   at org.apache.beam.sdk.io.kafka.KafkaIO$Read$1.apply(KafkaIO.java:568)
>   at org.apache.beam.sdk.io.kafka.KafkaIO$Read$1.apply(KafkaIO.java:565)
>   at 
> org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaReader.getWatermark(KafkaIO.java:1210)
>   at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.onProcessingTime(UnboundedSourceWrapper.java:431)
>   at 
> 

[jira] [Assigned] (BEAM-307) Upgrade/Test to Kafka 0.10

2017-10-30 Thread Raghu Angadi (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raghu Angadi reassigned BEAM-307:
-

Assignee: Xu Mingmin  (was: Jean-Baptiste Onofré)

> Upgrade/Test to Kafka 0.10
> --
>
> Key: BEAM-307
> URL: https://issues.apache.org/jira/browse/BEAM-307
> Project: Beam
>  Issue Type: Task
>  Components: sdk-java-extensions
>Reporter: Jean-Baptiste Onofré
>Assignee: Xu Mingmin
>  Labels: backward-incompatible
>
> I gonna test at least that the KafkaIO works fine with Kafka 0.10 (I'm 
> preparing new complete samples around that).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-1664) Support Kafka0.8.x client in KafkaIO

2017-10-26 Thread Raghu Angadi (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221039#comment-16221039
 ] 

Raghu Angadi commented on BEAM-1664:


[~r7raul], there are no plans to support 0.8x. It is a very old version and 
client works entirely different. We are in fact considering deprecating 0.9.x 
support. I don't think it is worth the effort to restructure KafkaIO to support 
0.8x.


> Support  Kafka0.8.x client in KafkaIO
> -
>
> Key: BEAM-1664
> URL: https://issues.apache.org/jira/browse/BEAM-1664
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-extensions
>Reporter: JiJun Tang
>Assignee: Reuven Lax
>
> Kafka-0.8 is not supported yet, these's a big change from 0.8 to 0.9. So we 
> need to create a specific KafkaIO moudle for 0.8. After complete this 
> moudle,we will consider to extract common code to kafkaio-common  moudle.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (BEAM-1664) Support Kafka0.8.x client in KafkaIO

2017-10-26 Thread Raghu Angadi (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raghu Angadi reassigned BEAM-1664:
--

Assignee: Reuven Lax  (was: Raghu Angadi)

> Support  Kafka0.8.x client in KafkaIO
> -
>
> Key: BEAM-1664
> URL: https://issues.apache.org/jira/browse/BEAM-1664
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-extensions
>Reporter: JiJun Tang
>Assignee: Reuven Lax
>
> Kafka-0.8 is not supported yet, these's a big change from 0.8 to 0.9. So we 
> need to create a specific KafkaIO moudle for 0.8. After complete this 
> moudle,we will consider to extract common code to kafkaio-common  moudle.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (BEAM-2704) KafkaIO: NPE without key serializer set

2017-10-26 Thread Raghu Angadi (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raghu Angadi updated BEAM-2704:
---
Priority: Major  (was: Blocker)

> KafkaIO: NPE without key serializer set
> ---
>
> Key: BEAM-2704
> URL: https://issues.apache.org/jira/browse/BEAM-2704
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-extensions
>Reporter: Chris Pettitt
>Assignee: Raghu Angadi
>
> The KafkaIO javadoc implies that you do not need to set a Serializer if you 
> only want to emit values:
> {code}
>  * Often you might want to write just values without any keys to Kafka. 
> Use {@code values()} to
>  * write records with default empty(null) key:
>  *
>  * {@code
>  *  PCollection strings = ...;
>  *  strings.apply(KafkaIO.write()
>  *  .withBootstrapServers("broker_1:9092,broker_2:9092")
>  *  .withTopic("results")
>  *  .withValueSerializer(new StringSerializer()) // just need serializer 
> for value
>  *  .values()
>  *);
>  * }
> {code}
> However, if you don't set the key serializer then Kafka blows up when trying 
> to instantiate the key serializer (in Kafka 0.10.1, at least). It would be 
> more convenient if KafkaIO worked as documented and assigned a null 
> serializer if values() is used.  
> Relevant stack trace:
> {code}
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:230)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:163)
>   at 
> org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter.setup(KafkaIO.java:1582)
>   at 
> org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter$DoFnInvoker.invokeSetup(Unknown
>  Source)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (BEAM-2704) KafkaIO: NPE without key serializer set

2017-10-26 Thread Raghu Angadi (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raghu Angadi updated BEAM-2704:
---
Fix Version/s: (was: 2.2.0)

> KafkaIO: NPE without key serializer set
> ---
>
> Key: BEAM-2704
> URL: https://issues.apache.org/jira/browse/BEAM-2704
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-extensions
>Reporter: Chris Pettitt
>Assignee: Raghu Angadi
>Priority: Blocker
>
> The KafkaIO javadoc implies that you do not need to set a Serializer if you 
> only want to emit values:
> {code}
>  * Often you might want to write just values without any keys to Kafka. 
> Use {@code values()} to
>  * write records with default empty(null) key:
>  *
>  * {@code
>  *  PCollection strings = ...;
>  *  strings.apply(KafkaIO.write()
>  *  .withBootstrapServers("broker_1:9092,broker_2:9092")
>  *  .withTopic("results")
>  *  .withValueSerializer(new StringSerializer()) // just need serializer 
> for value
>  *  .values()
>  *);
>  * }
> {code}
> However, if you don't set the key serializer then Kafka blows up when trying 
> to instantiate the key serializer (in Kafka 0.10.1, at least). It would be 
> more convenient if KafkaIO worked as documented and assigned a null 
> serializer if values() is used.  
> Relevant stack trace:
> {code}
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:230)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:163)
>   at 
> org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter.setup(KafkaIO.java:1582)
>   at 
> org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter$DoFnInvoker.invokeSetup(Unknown
>  Source)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (BEAM-2257) KafkaIO write without key requires a producer fn

2017-10-26 Thread Raghu Angadi (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raghu Angadi updated BEAM-2257:
---
Priority: Major  (was: Blocker)

> KafkaIO write without key requires a producer fn
> 
>
> Key: BEAM-2257
> URL: https://issues.apache.org/jira/browse/BEAM-2257
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-extensions
>Reporter: Jean-Baptiste Onofré
>Assignee: Jean-Baptiste Onofré
>
> The {{KafkaIO}} javadoc says that it's possible to write directly {{String}} 
> to the topic without key:
> {code}
>PCollection strings = ...;
>strings.apply(KafkaIO.write()
>.withBootstrapServers("broker_1:9092,broker_2:9092")
>.withTopic("results")
>.withValueSerializer(new StringSerializer()) // just need serializer 
> for value
>.values()
>  );
> {code}
> This is not fully correct:
> 1. {{withValueSerializer()}} requires a class of serializer, not an instance. 
> So, it should be {{withValueSerializer(StringSerializer.class)}}.
> 2. As the key serializer is not provider, a kafka producer fn is required, 
> else, the user will get:
> {code}
> Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka 
> producer
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:321)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:156)
>   at 
> org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter.setup(KafkaIO.java:1494)
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:300)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:156)
>   at 
> org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter.setup(KafkaIO.java:1494)
> {code}
> A possible workaround is to create a {{VoidSerializer}} and pass it via 
> {{withKeySerializer()}} or provide the producer fn.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (BEAM-2257) KafkaIO write without key requires a producer fn

2017-10-26 Thread Raghu Angadi (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raghu Angadi updated BEAM-2257:
---
Fix Version/s: (was: 2.2.0)

> KafkaIO write without key requires a producer fn
> 
>
> Key: BEAM-2257
> URL: https://issues.apache.org/jira/browse/BEAM-2257
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-extensions
>Reporter: Jean-Baptiste Onofré
>Assignee: Jean-Baptiste Onofré
>
> The {{KafkaIO}} javadoc says that it's possible to write directly {{String}} 
> to the topic without key:
> {code}
>PCollection strings = ...;
>strings.apply(KafkaIO.write()
>.withBootstrapServers("broker_1:9092,broker_2:9092")
>.withTopic("results")
>.withValueSerializer(new StringSerializer()) // just need serializer 
> for value
>.values()
>  );
> {code}
> This is not fully correct:
> 1. {{withValueSerializer()}} requires a class of serializer, not an instance. 
> So, it should be {{withValueSerializer(StringSerializer.class)}}.
> 2. As the key serializer is not provider, a kafka producer fn is required, 
> else, the user will get:
> {code}
> Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka 
> producer
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:321)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:156)
>   at 
> org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter.setup(KafkaIO.java:1494)
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:300)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:156)
>   at 
> org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter.setup(KafkaIO.java:1494)
> {code}
> A possible workaround is to create a {{VoidSerializer}} and pass it via 
> {{withKeySerializer()}} or provide the producer fn.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (BEAM-2257) KafkaIO write without key requires a producer fn

2017-10-26 Thread Raghu Angadi (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raghu Angadi updated BEAM-2257:
---
Fix Version/s: 2.2.0

> KafkaIO write without key requires a producer fn
> 
>
> Key: BEAM-2257
> URL: https://issues.apache.org/jira/browse/BEAM-2257
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-extensions
>Reporter: Jean-Baptiste Onofré
>Assignee: Jean-Baptiste Onofré
>Priority: Blocker
> Fix For: 2.2.0
>
>
> The {{KafkaIO}} javadoc says that it's possible to write directly {{String}} 
> to the topic without key:
> {code}
>PCollection strings = ...;
>strings.apply(KafkaIO.write()
>.withBootstrapServers("broker_1:9092,broker_2:9092")
>.withTopic("results")
>.withValueSerializer(new StringSerializer()) // just need serializer 
> for value
>.values()
>  );
> {code}
> This is not fully correct:
> 1. {{withValueSerializer()}} requires a class of serializer, not an instance. 
> So, it should be {{withValueSerializer(StringSerializer.class)}}.
> 2. As the key serializer is not provider, a kafka producer fn is required, 
> else, the user will get:
> {code}
> Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka 
> producer
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:321)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:156)
>   at 
> org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter.setup(KafkaIO.java:1494)
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:300)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:156)
>   at 
> org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter.setup(KafkaIO.java:1494)
> {code}
> A possible workaround is to create a {{VoidSerializer}} and pass it via 
> {{withKeySerializer()}} or provide the producer fn.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (BEAM-2257) KafkaIO write without key requires a producer fn

2017-10-26 Thread Raghu Angadi (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raghu Angadi updated BEAM-2257:
---
Priority: Blocker  (was: Major)

> KafkaIO write without key requires a producer fn
> 
>
> Key: BEAM-2257
> URL: https://issues.apache.org/jira/browse/BEAM-2257
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-extensions
>Reporter: Jean-Baptiste Onofré
>Assignee: Jean-Baptiste Onofré
>Priority: Blocker
> Fix For: 2.2.0
>
>
> The {{KafkaIO}} javadoc says that it's possible to write directly {{String}} 
> to the topic without key:
> {code}
>PCollection strings = ...;
>strings.apply(KafkaIO.write()
>.withBootstrapServers("broker_1:9092,broker_2:9092")
>.withTopic("results")
>.withValueSerializer(new StringSerializer()) // just need serializer 
> for value
>.values()
>  );
> {code}
> This is not fully correct:
> 1. {{withValueSerializer()}} requires a class of serializer, not an instance. 
> So, it should be {{withValueSerializer(StringSerializer.class)}}.
> 2. As the key serializer is not provider, a kafka producer fn is required, 
> else, the user will get:
> {code}
> Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka 
> producer
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:321)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:156)
>   at 
> org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter.setup(KafkaIO.java:1494)
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:300)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:156)
>   at 
> org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter.setup(KafkaIO.java:1494)
> {code}
> A possible workaround is to create a {{VoidSerializer}} and pass it via 
> {{withKeySerializer()}} or provide the producer fn.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (BEAM-2704) KafkaIO: NPE without key serializer set

2017-10-26 Thread Raghu Angadi (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raghu Angadi updated BEAM-2704:
---
Priority: Blocker  (was: Major)

> KafkaIO: NPE without key serializer set
> ---
>
> Key: BEAM-2704
> URL: https://issues.apache.org/jira/browse/BEAM-2704
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-extensions
>Reporter: Chris Pettitt
>Assignee: Raghu Angadi
>Priority: Blocker
> Fix For: 2.2.0
>
>
> The KafkaIO javadoc implies that you do not need to set a Serializer if you 
> only want to emit values:
> {code}
>  * Often you might want to write just values without any keys to Kafka. 
> Use {@code values()} to
>  * write records with default empty(null) key:
>  *
>  * {@code
>  *  PCollection strings = ...;
>  *  strings.apply(KafkaIO.write()
>  *  .withBootstrapServers("broker_1:9092,broker_2:9092")
>  *  .withTopic("results")
>  *  .withValueSerializer(new StringSerializer()) // just need serializer 
> for value
>  *  .values()
>  *);
>  * }
> {code}
> However, if you don't set the key serializer then Kafka blows up when trying 
> to instantiate the key serializer (in Kafka 0.10.1, at least). It would be 
> more convenient if KafkaIO worked as documented and assigned a null 
> serializer if values() is used.  
> Relevant stack trace:
> {code}
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:230)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:163)
>   at 
> org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter.setup(KafkaIO.java:1582)
>   at 
> org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter$DoFnInvoker.invokeSetup(Unknown
>  Source)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (BEAM-2704) KafkaIO: NPE without key serializer set

2017-10-26 Thread Raghu Angadi (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raghu Angadi updated BEAM-2704:
---
Fix Version/s: 2.2.0

> KafkaIO: NPE without key serializer set
> ---
>
> Key: BEAM-2704
> URL: https://issues.apache.org/jira/browse/BEAM-2704
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-extensions
>Reporter: Chris Pettitt
>Assignee: Raghu Angadi
>Priority: Blocker
> Fix For: 2.2.0
>
>
> The KafkaIO javadoc implies that you do not need to set a Serializer if you 
> only want to emit values:
> {code}
>  * Often you might want to write just values without any keys to Kafka. 
> Use {@code values()} to
>  * write records with default empty(null) key:
>  *
>  * {@code
>  *  PCollection strings = ...;
>  *  strings.apply(KafkaIO.write()
>  *  .withBootstrapServers("broker_1:9092,broker_2:9092")
>  *  .withTopic("results")
>  *  .withValueSerializer(new StringSerializer()) // just need serializer 
> for value
>  *  .values()
>  *);
>  * }
> {code}
> However, if you don't set the key serializer then Kafka blows up when trying 
> to instantiate the key serializer (in Kafka 0.10.1, at least). It would be 
> more convenient if KafkaIO worked as documented and assigned a null 
> serializer if values() is used.  
> Relevant stack trace:
> {code}
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:230)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:163)
>   at 
> org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter.setup(KafkaIO.java:1582)
>   at 
> org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter$DoFnInvoker.invokeSetup(Unknown
>  Source)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (BEAM-2703) KafkaIO: watermark outside the bounds of BoundedWindow

2017-10-26 Thread Raghu Angadi (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raghu Angadi updated BEAM-2703:
---
Fix Version/s: (was: 2.2.0)

> KafkaIO: watermark outside the bounds of BoundedWindow
> --
>
> Key: BEAM-2703
> URL: https://issues.apache.org/jira/browse/BEAM-2703
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-extensions
>Reporter: Chris Pettitt
>Assignee: Raghu Angadi
>
> KafkaIO appears to use an incorrect lower bound for it's initial watermark 
> with respect to BoundedWindow.TIMESTAMP_MIN_VALUE.
> KafkaIO's initial watermark:
> new Instant(Long.MIN_VALUE) -> -9223372036854775808
> BoundedWindow.TIMESTAMP_MIN_VALUE:
> new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MIN_VALUE)) -> 
> -9223372036854775
> The difference is that the last three digits have been truncated due to the 
> micro to millis conversion.
> This difference can cause errors in runners that assert that the input 
> watermark can never regress as KafkaIO gives a value below the lower bound 
> when no messages have been received yet. For consistency it would probably be 
> best for it to use BoundedWindow.TIMESTAMP_MIN_VALUE.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (BEAM-2703) KafkaIO: watermark outside the bounds of BoundedWindow

2017-10-26 Thread Raghu Angadi (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raghu Angadi updated BEAM-2703:
---
Priority: Blocker  (was: Major)

> KafkaIO: watermark outside the bounds of BoundedWindow
> --
>
> Key: BEAM-2703
> URL: https://issues.apache.org/jira/browse/BEAM-2703
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-extensions
>Reporter: Chris Pettitt
>Assignee: Raghu Angadi
>Priority: Blocker
>
> KafkaIO appears to use an incorrect lower bound for it's initial watermark 
> with respect to BoundedWindow.TIMESTAMP_MIN_VALUE.
> KafkaIO's initial watermark:
> new Instant(Long.MIN_VALUE) -> -9223372036854775808
> BoundedWindow.TIMESTAMP_MIN_VALUE:
> new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MIN_VALUE)) -> 
> -9223372036854775
> The difference is that the last three digits have been truncated due to the 
> micro to millis conversion.
> This difference can cause errors in runners that assert that the input 
> watermark can never regress as KafkaIO gives a value below the lower bound 
> when no messages have been received yet. For consistency it would probably be 
> best for it to use BoundedWindow.TIMESTAMP_MIN_VALUE.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (BEAM-2703) KafkaIO: watermark outside the bounds of BoundedWindow

2017-10-26 Thread Raghu Angadi (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raghu Angadi updated BEAM-2703:
---
Priority: Major  (was: Blocker)

> KafkaIO: watermark outside the bounds of BoundedWindow
> --
>
> Key: BEAM-2703
> URL: https://issues.apache.org/jira/browse/BEAM-2703
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-extensions
>Reporter: Chris Pettitt
>Assignee: Raghu Angadi
>
> KafkaIO appears to use an incorrect lower bound for it's initial watermark 
> with respect to BoundedWindow.TIMESTAMP_MIN_VALUE.
> KafkaIO's initial watermark:
> new Instant(Long.MIN_VALUE) -> -9223372036854775808
> BoundedWindow.TIMESTAMP_MIN_VALUE:
> new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MIN_VALUE)) -> 
> -9223372036854775
> The difference is that the last three digits have been truncated due to the 
> micro to millis conversion.
> This difference can cause errors in runners that assert that the input 
> watermark can never regress as KafkaIO gives a value below the lower bound 
> when no messages have been received yet. For consistency it would probably be 
> best for it to use BoundedWindow.TIMESTAMP_MIN_VALUE.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (BEAM-2703) KafkaIO: watermark outside the bounds of BoundedWindow

2017-10-26 Thread Raghu Angadi (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raghu Angadi updated BEAM-2703:
---
Fix Version/s: 2.2.0

> KafkaIO: watermark outside the bounds of BoundedWindow
> --
>
> Key: BEAM-2703
> URL: https://issues.apache.org/jira/browse/BEAM-2703
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-extensions
>Reporter: Chris Pettitt
>Assignee: Raghu Angadi
>Priority: Blocker
>
> KafkaIO appears to use an incorrect lower bound for it's initial watermark 
> with respect to BoundedWindow.TIMESTAMP_MIN_VALUE.
> KafkaIO's initial watermark:
> new Instant(Long.MIN_VALUE) -> -9223372036854775808
> BoundedWindow.TIMESTAMP_MIN_VALUE:
> new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MIN_VALUE)) -> 
> -9223372036854775
> The difference is that the last three digits have been truncated due to the 
> micro to millis conversion.
> This difference can cause errors in runners that assert that the input 
> watermark can never regress as KafkaIO gives a value below the lower bound 
> when no messages have been received yet. For consistency it would probably be 
> best for it to use BoundedWindow.TIMESTAMP_MIN_VALUE.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (BEAM-2979) Race condition between KafkaIO.UnboundedKafkaReader.getWatermark() and KafkaIO.UnboundedKafkaReader.advance()

2017-10-26 Thread Raghu Angadi (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2979?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raghu Angadi updated BEAM-2979:
---
Priority: Blocker  (was: Major)

> Race condition between KafkaIO.UnboundedKafkaReader.getWatermark() and 
> KafkaIO.UnboundedKafkaReader.advance()
> -
>
> Key: BEAM-2979
> URL: https://issues.apache.org/jira/browse/BEAM-2979
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Affects Versions: 2.0.0, 2.1.0
>Reporter: Wesley Tanaka
>Assignee: Raghu Angadi
>Priority: Blocker
> Fix For: 2.2.0
>
>
> getWatermark() looks like this:
> {noformat}
> @Override
> public Instant getWatermark() {
>   if (curRecord == null) {
> LOG.debug("{}: getWatermark() : no records have been read yet.", 
> name);
> return initialWatermark;
>   }
>   return source.spec.getWatermarkFn() != null
>   ? source.spec.getWatermarkFn().apply(curRecord) : curTimestamp;
> }
> {noformat}
> advance() has code in it that looks like this:
> {noformat}
>   curRecord = null; // user coders below might throw.
>   // apply user deserializers.
>   // TODO: write records that can't be deserialized to a 
> "dead-letter" additional output.
>   KafkaRecord record = new KafkaRecord(
>   rawRecord.topic(),
>   rawRecord.partition(),
>   rawRecord.offset(),
>   consumerSpEL.getRecordTimestamp(rawRecord),
>   keyDeserializerInstance.deserialize(rawRecord.topic(), 
> rawRecord.key()),
>   valueDeserializerInstance.deserialize(rawRecord.topic(), 
> rawRecord.value()));
>   curTimestamp = (source.spec.getTimestampFn() == null)
>   ? Instant.now() : source.spec.getTimestampFn().apply(record);
>   curRecord = record;
> {noformat}
> There's a race condition between these two blocks of code which is exposed at 
> the very least in the FlinkRunner, which calls getWatermark() periodically 
> from a timer.
> The symptom of the race condition is a stack trace that looks like this (SDK 
> 2.0.0):
> {noformat}
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:910)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:853)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:853)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: TimerException{java.lang.NullPointerException}
>   at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:220)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
>   at org.apache.beam.sdk.io.kafka.KafkaIO$Read$1.apply(KafkaIO.java:568)
>   at org.apache.beam.sdk.io.kafka.KafkaIO$Read$1.apply(KafkaIO.java:565)
>   at 
> org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaReader.getWatermark(KafkaIO.java:1210)
>   at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.onProcessingTime(UnboundedSourceWrapper.java:431)
>   at 
> 

[jira] [Updated] (BEAM-2979) Race condition between KafkaIO.UnboundedKafkaReader.getWatermark() and KafkaIO.UnboundedKafkaReader.advance()

2017-10-26 Thread Raghu Angadi (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2979?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raghu Angadi updated BEAM-2979:
---
Fix Version/s: 2.2.0

> Race condition between KafkaIO.UnboundedKafkaReader.getWatermark() and 
> KafkaIO.UnboundedKafkaReader.advance()
> -
>
> Key: BEAM-2979
> URL: https://issues.apache.org/jira/browse/BEAM-2979
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Affects Versions: 2.0.0, 2.1.0
>Reporter: Wesley Tanaka
>Assignee: Raghu Angadi
> Fix For: 2.2.0
>
>
> getWatermark() looks like this:
> {noformat}
> @Override
> public Instant getWatermark() {
>   if (curRecord == null) {
> LOG.debug("{}: getWatermark() : no records have been read yet.", 
> name);
> return initialWatermark;
>   }
>   return source.spec.getWatermarkFn() != null
>   ? source.spec.getWatermarkFn().apply(curRecord) : curTimestamp;
> }
> {noformat}
> advance() has code in it that looks like this:
> {noformat}
>   curRecord = null; // user coders below might throw.
>   // apply user deserializers.
>   // TODO: write records that can't be deserialized to a 
> "dead-letter" additional output.
>   KafkaRecord record = new KafkaRecord(
>   rawRecord.topic(),
>   rawRecord.partition(),
>   rawRecord.offset(),
>   consumerSpEL.getRecordTimestamp(rawRecord),
>   keyDeserializerInstance.deserialize(rawRecord.topic(), 
> rawRecord.key()),
>   valueDeserializerInstance.deserialize(rawRecord.topic(), 
> rawRecord.value()));
>   curTimestamp = (source.spec.getTimestampFn() == null)
>   ? Instant.now() : source.spec.getTimestampFn().apply(record);
>   curRecord = record;
> {noformat}
> There's a race condition between these two blocks of code which is exposed at 
> the very least in the FlinkRunner, which calls getWatermark() periodically 
> from a timer.
> The symptom of the race condition is a stack trace that looks like this (SDK 
> 2.0.0):
> {noformat}
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:910)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:853)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:853)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: TimerException{java.lang.NullPointerException}
>   at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:220)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
>   at org.apache.beam.sdk.io.kafka.KafkaIO$Read$1.apply(KafkaIO.java:568)
>   at org.apache.beam.sdk.io.kafka.KafkaIO$Read$1.apply(KafkaIO.java:565)
>   at 
> org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaReader.getWatermark(KafkaIO.java:1210)
>   at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.onProcessingTime(UnboundedSourceWrapper.java:431)
>   at 
> 

[jira] [Commented] (BEAM-3093) add an option 'FirstPollOffsetStrategy' to KafkaIO

2017-10-24 Thread Raghu Angadi (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16217318#comment-16217318
 ] 

Raghu Angadi commented on BEAM-3093:


Yes. withStartReadTime().


> add an option 'FirstPollOffsetStrategy' to KafkaIO
> --
>
> Key: BEAM-3093
> URL: https://issues.apache.org/jira/browse/BEAM-3093
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Xu Mingmin
>Assignee: Kenneth Knowles
>
> This is a feature borrowed from Storm KafkaSpout.
> *What's the issue?*
> In KafkaIO, when offset is stored either in checkpoint or auto_committed, it 
> cannot be changed in application, to force to read from earliest/latest. 
> --This feature is important to reset the start offset when relaunching a job.
> *Proposed solution:*
> By borrowing the FirstPollOffsetStrategy concept, users can have more options:
> 1). *{{EARLIEST}}*: always start_from_beginning no matter of what's in 
> checkpoint/auto_commit;
> 2). *{{LATEST}}*: always start_from_latest no matter of what's in 
> checkpoint/auto_commit;
> 3). *{{UNCOMMITTED_EARLIEST}}*: if no offset in checkpoint/auto_commit then 
> start_from_beginning if, otherwise start_from_previous_offset;
> 4). *{{UNCOMMITTED_LATEST}}*: if no offset in checkpoint/auto_commit then 
> start_from_latest, otherwise start_from_previous_offset;
> [~rangadi], any comments?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-3093) add an option 'FirstPollOffsetStrategy' to KafkaIO

2017-10-24 Thread Raghu Angadi (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16217240#comment-16217240
 ] 

Raghu Angadi commented on BEAM-3093:


Did you check out `withStartReadOffset()' in KafkaIO? This might be what you 
are looking for for (1) & (2) in when auto_commit is enabled.

I think it is better not to override checkpointed offsets.

> add an option 'FirstPollOffsetStrategy' to KafkaIO
> --
>
> Key: BEAM-3093
> URL: https://issues.apache.org/jira/browse/BEAM-3093
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Xu Mingmin
>Assignee: Kenneth Knowles
>
> This is a feature borrowed from Storm KafkaSpout.
> *What's the issue?*
> In KafkaIO, when offset is stored either in checkpoint or auto_committed, it 
> cannot be changed in application, to force to read from earliest/latest. 
> --This feature is important to reset the start offset when relaunching a job.
> *Proposed solution:*
> By borrowing the FirstPollOffsetStrategy concept, users can have more options:
> 1). *{{EARLIEST}}*: always start_from_beginning no matter of what's in 
> checkpoint/auto_commit;
> 2). *{{LATEST}}*: always start_from_latest no matter of what's in 
> checkpoint/auto_commit;
> 3). *{{UNCOMMITTED_EARLIEST}}*: if no offset in checkpoint/auto_commit then 
> start_from_beginning if, otherwise start_from_previous_offset;
> 4). *{{UNCOMMITTED_LATEST}}*: if no offset in checkpoint/auto_commit then 
> start_from_latest, otherwise start_from_previous_offset;
> [~rangadi], any comments?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (BEAM-3093) add an option 'FirstPollOffsetStrategy' to KafkaIO

2017-10-23 Thread Raghu Angadi (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16216107#comment-16216107
 ] 

Raghu Angadi edited comment on BEAM-3093 at 10/24/17 12:13 AM:
---

(3) is same as setting "auto.offset.reset" Kafka consumer config to "earliest".
(4) is the default behavior. Same as setting ""auto.offset.reset" config to 
"latest".

It is better not to mix 'checkpointed' offsets and 'auto_committed' offsets. If 
you restart a job from scratch, 'checkpointed' offsets are discarded out as 
well.

(1) & (2) might be useful in the case of 'auto_committed' offsets. User can 
always remove auto_committed offsets in Kafka through admin commands. In that 
sense, (1) is same as '(3) with auto committed offsets reset.'. In fact, 
resetting these offsets using Kafka admin commands gives you much better 
control on where you want to start processing. E.g. you could resume from 24 
hours ago rather then from max retention period for Kafka.





was (Author: rangadi):
(3) is same as setting "auto.offset.reset" Kafka consumer config to "earliest".
(4) is the default behavior. Same as setting ""auto.offset.reset" config to 
"latest".

It is better not to mix 'checkpointed' offsets and 'auto_committed' offsets. If 
you restart a job scratch, 'checkpointed' offsets are thrown out as well.

(1) & (2) might be useful in the case of 'auto_committed' offsets. User can 
always remove auto_committed offsets in Kafka through admin commands. In that 
sense, (1) is same as '(3) with auto committed offsets reset.'. In fact, 
resetting these offsets using Kafka admin commands gives you much better 
control on where you want to start processing. E.g. you could resume from 24 
hours ago rather then from max retention period for Kafka.




> add an option 'FirstPollOffsetStrategy' to KafkaIO
> --
>
> Key: BEAM-3093
> URL: https://issues.apache.org/jira/browse/BEAM-3093
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Xu Mingmin
>Assignee: Kenneth Knowles
>
> This is a feature borrowed from Storm KafkaSpout.
> *What's the issue?*
> In KafkaIO, when offset is stored either in checkpoint or auto_committed, it 
> cannot be changed in application, to force to read from earliest/latest. 
> --This feature is important to reset the start offset when relaunching a job.
> *Proposed solution:*
> By borrowing the FirstPollOffsetStrategy concept, users can have more options:
> 1). *{{EARLIEST}}*: always start_from_beginning no matter of what's in 
> checkpoint/auto_commit;
> 2). *{{LATEST}}*: always start_from_latest no matter of what's in 
> checkpoint/auto_commit;
> 3). *{{UNCOMMITTED_EARLIEST}}*: if no offset in checkpoint/auto_commit then 
> start_from_beginning if, otherwise start_from_previous_offset;
> 4). *{{UNCOMMITTED_LATEST}}*: if no offset in checkpoint/auto_commit then 
> start_from_latest, otherwise start_from_previous_offset;
> [~rangadi], any comments?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-3093) add an option 'FirstPollOffsetStrategy' to KafkaIO

2017-10-23 Thread Raghu Angadi (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16216107#comment-16216107
 ] 

Raghu Angadi commented on BEAM-3093:


(3) is same as setting "auto.offset.reset" Kafka consumer config to "earliest".
(4) is the default behavior. Same as setting ""auto.offset.reset" config to 
"latest".

It is better not to mix 'checkpointed' offsets and 'auto_committed' offsets. If 
you restart a job scratch, 'checkpointed' offsets are thrown out as well.

(1) & (2) might be useful in the case of 'auto_committed' offsets. User can 
always remove auto_committed offsets in Kafka through admin commands. In that 
sense, (1) is same as '(3) with auto committed offsets reset.'. In fact, 
resetting these offsets using Kafka admin commands gives you much better 
control on where you want to start processing. E.g. you could resume from 24 
hours ago rather then from max retention period for Kafka.




> add an option 'FirstPollOffsetStrategy' to KafkaIO
> --
>
> Key: BEAM-3093
> URL: https://issues.apache.org/jira/browse/BEAM-3093
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Xu Mingmin
>Assignee: Kenneth Knowles
>
> This is a feature borrowed from Storm KafkaSpout.
> *What's the issue?*
> In KafkaIO, when offset is stored either in checkpoint or auto_committed, it 
> cannot be changed in application, to force to read from earliest/latest. 
> --This feature is important to reset the start offset when relaunching a job.
> *Proposed solution:*
> By borrowing the FirstPollOffsetStrategy concept, users can have more options:
> 1). *{{EARLIEST}}*: always start_from_beginning no matter of what's in 
> checkpoint/auto_commit;
> 2). *{{LATEST}}*: always start_from_latest no matter of what's in 
> checkpoint/auto_commit;
> 3). *{{UNCOMMITTED_EARLIEST}}*: if no offset in checkpoint/auto_commit then 
> start_from_beginning if, otherwise start_from_previous_offset;
> 4). *{{UNCOMMITTED_LATEST}}*: if no offset in checkpoint/auto_commit then 
> start_from_latest, otherwise start_from_previous_offset;
> [~rangadi], any comments?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


  1   2   >