[jira] [Created] (BEAM-12585) Consider making NullableCoder as a StandardCoder

2021-07-07 Thread Boyuan Zhang (Jira)
Boyuan Zhang created BEAM-12585:
---

 Summary: Consider making NullableCoder as a StandardCoder
 Key: BEAM-12585
 URL: https://issues.apache.org/jira/browse/BEAM-12585
 Project: Beam
  Issue Type: Improvement
  Components: sdk-java-core, sdk-py-core
Reporter: Boyuan Zhang


Having NullableCoder as a StandardCoder helps cross-language pipeline handle 
null-key KafkaRecord. See discussion: 
https://lists.apache.org/thread.html/r1740cb29e46644a08b2d1755900dd6dcd5c62b937da1a01148edfcb9%40%3Cdev.beam.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-12494) Dataflow Kafka Job not triggering for external subnet

2021-07-02 Thread Boyuan Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-12494?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyuan Zhang updated BEAM-12494:

Resolution: Won't Fix
Status: Resolved  (was: Open)

> Dataflow Kafka Job not triggering for external subnet
> -
>
> Key: BEAM-12494
> URL: https://issues.apache.org/jira/browse/BEAM-12494
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-kafka
>Affects Versions: 2.28.0
> Environment: IntelliJ community version, Maven, Windows, Dataflow 
> version 2.28.0
>Reporter: Jasminder pal singh sehgal
>Priority: P2
> Fix For: Not applicable
>
> Attachments: CodeSnippet.JPG, LogsStreamingEngine.txt, 
> SuccessfulJobRun-KafkaIngestion.txt, TimeOutLogs_KafkaIngestion.txt, 
> image-2021-06-16-16-54-25-161.png, image-2021-06-16-16-55-57-509.png, 
> image-2021-06-20-22-20-24-363.png, image-2021-06-20-22-23-14-052.png, 
> image-2021-06-21-15-00-09-851.png
>
>
> Hello,
> Our team is facing an issue in streaming the Dataflow Kafka job through 
> IntelliJ that is hosted on a private subnet. 
> The hypothesis is that during Graph Construction time [0], the beam locally 
> tries to execute the code and check all the connections. In our case, we 
> don't have access to subnet through IntelliJ or through the Cloud console. We 
> do have access when compute engine instance is created within that subnet.
> We reached out to Google support and they suggested us to raise a defect with 
> u. The following code throws *time-out* error when we execute through 
> IntelliJ.
> {code:java}
> pipeline.apply("Read Kafka", KafkaIO.read()
> .withConsumerConfigUpdates(propertyBuilder)
> .withConsumerConfigUpdates(
> ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, 
> "my-consumer-group")
> )
> .withBootstrapServers(options.getBootstrapServers())
> .withTopics(topicsList)
> .withKeyDeserializer(StringDeserializer.class)
> .withValueDeserializer(StringDeserializer.class)
> .commitOffsetsInFinalize()
>// .withMaxNumRecords(5)
> )
> {code}
> But, if we uncomment
> {code:java}
> .withMaxNumRecords(){code}
> The code works perfectly and we are able to spin up dataflow job in the 
> desired subnet to ingest the Kafka stream.
> {code:java}
> pipeline.apply("Read Kafka", KafkaIO.read()
> .withConsumerConfigUpdates(propertyBuilder)
> .withConsumerConfigUpdates(
> ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, 
> "my-consumer-group")
> )
> .withBootstrapServers(options.getBootstrapServers())
> .withTopics(topicsList)
> .withKeyDeserializer(StringDeserializer.class)
> .withValueDeserializer(StringDeserializer.class)
> .commitOffsetsInFinalize()
> .withMaxNumRecords(5)
> )
> {code}
> The issue with the above Code is that the Dataflow will stop after ingesting 
> the given number of records and will act like Batch ingestion, instead of 
> Streaming, which we don't want.
> *Google support team hypothesis:*
> Current hypothesis is that the issue is happening in 
> `KakfaUnboundedSource.split()` [1] which fails due to unable to get Topic 
> information.
> The first point is, `withMaxNumRecords` is used for testing [2] and when 
> specified, the unbounded nature of the pipeline is converted into bounded 
> read in `BoundedReadFromUnboundedSource` [3] but without the 
> `withMaxNumRecords` the pipeline is still unbounded.
> When the pipeline is Bounded (when mentioning withMaxNumRecords) the 
> `split()` happens in Dataflow worker in `SplitFn` [4]. Since, it ran on 
> Dataflow, it did not have issue connecting to Kafka.
> But, when the pipeline is Unbounded (withMaxNumRecords commented out) the 
> `split()` is called when the pipeline is built locally at graph construction 
> phase [5][6] at which point it does not have access to Kafka.
>  
> [0]
>   
> [https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#pipeline-lifecycle:-from-pipeline-code-to-dataflow-job]
>   [1]
>  
> [https://github.com/apache/beam/blob/v2.28.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java#L57]
>  [2] 
> [https://beam.apache.org/releases/javadoc/2.28.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withMaxNumRecords-long-]
>  [3] 
> [https://github.com/apache/beam/blob/v2.28.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L191-L193]
>  [4] 
> [https://github.com/apache/beam/blob/v2.28.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java#L168-L169]
>  [5] 
> [https://github.com/apache/beam/blob/v2.28.0/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java#L87]
>  [6] 
> [https://c

[jira] [Commented] (BEAM-12494) Dataflow Kafka Job not triggering for external subnet

2021-07-02 Thread Boyuan Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-12494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17373803#comment-17373803
 ] 

Boyuan Zhang commented on BEAM-12494:
-

I believe this is an intended behavior on runner v1. If you want to bypass this 
issue, there are 2 options:
* Use runner_v2 on Dataflow instead. In order to do so, please reach out to the 
Dataflow Customer Support.
* Use  withTopicPartitions() instead of withTopic to read from Kafka.

> Dataflow Kafka Job not triggering for external subnet
> -
>
> Key: BEAM-12494
> URL: https://issues.apache.org/jira/browse/BEAM-12494
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-kafka
>Affects Versions: 2.28.0
> Environment: IntelliJ community version, Maven, Windows, Dataflow 
> version 2.28.0
>Reporter: Jasminder pal singh sehgal
>Priority: P2
> Fix For: Not applicable
>
> Attachments: CodeSnippet.JPG, LogsStreamingEngine.txt, 
> SuccessfulJobRun-KafkaIngestion.txt, TimeOutLogs_KafkaIngestion.txt, 
> image-2021-06-16-16-54-25-161.png, image-2021-06-16-16-55-57-509.png, 
> image-2021-06-20-22-20-24-363.png, image-2021-06-20-22-23-14-052.png, 
> image-2021-06-21-15-00-09-851.png
>
>
> Hello,
> Our team is facing an issue in streaming the Dataflow Kafka job through 
> IntelliJ that is hosted on a private subnet. 
> The hypothesis is that during Graph Construction time [0], the beam locally 
> tries to execute the code and check all the connections. In our case, we 
> don't have access to subnet through IntelliJ or through the Cloud console. We 
> do have access when compute engine instance is created within that subnet.
> We reached out to Google support and they suggested us to raise a defect with 
> u. The following code throws *time-out* error when we execute through 
> IntelliJ.
> {code:java}
> pipeline.apply("Read Kafka", KafkaIO.read()
> .withConsumerConfigUpdates(propertyBuilder)
> .withConsumerConfigUpdates(
> ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, 
> "my-consumer-group")
> )
> .withBootstrapServers(options.getBootstrapServers())
> .withTopics(topicsList)
> .withKeyDeserializer(StringDeserializer.class)
> .withValueDeserializer(StringDeserializer.class)
> .commitOffsetsInFinalize()
>// .withMaxNumRecords(5)
> )
> {code}
> But, if we uncomment
> {code:java}
> .withMaxNumRecords(){code}
> The code works perfectly and we are able to spin up dataflow job in the 
> desired subnet to ingest the Kafka stream.
> {code:java}
> pipeline.apply("Read Kafka", KafkaIO.read()
> .withConsumerConfigUpdates(propertyBuilder)
> .withConsumerConfigUpdates(
> ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, 
> "my-consumer-group")
> )
> .withBootstrapServers(options.getBootstrapServers())
> .withTopics(topicsList)
> .withKeyDeserializer(StringDeserializer.class)
> .withValueDeserializer(StringDeserializer.class)
> .commitOffsetsInFinalize()
> .withMaxNumRecords(5)
> )
> {code}
> The issue with the above Code is that the Dataflow will stop after ingesting 
> the given number of records and will act like Batch ingestion, instead of 
> Streaming, which we don't want.
> *Google support team hypothesis:*
> Current hypothesis is that the issue is happening in 
> `KakfaUnboundedSource.split()` [1] which fails due to unable to get Topic 
> information.
> The first point is, `withMaxNumRecords` is used for testing [2] and when 
> specified, the unbounded nature of the pipeline is converted into bounded 
> read in `BoundedReadFromUnboundedSource` [3] but without the 
> `withMaxNumRecords` the pipeline is still unbounded.
> When the pipeline is Bounded (when mentioning withMaxNumRecords) the 
> `split()` happens in Dataflow worker in `SplitFn` [4]. Since, it ran on 
> Dataflow, it did not have issue connecting to Kafka.
> But, when the pipeline is Unbounded (withMaxNumRecords commented out) the 
> `split()` is called when the pipeline is built locally at graph construction 
> phase [5][6] at which point it does not have access to Kafka.
>  
> [0]
>   
> [https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#pipeline-lifecycle:-from-pipeline-code-to-dataflow-job]
>   [1]
>  
> [https://github.com/apache/beam/blob/v2.28.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java#L57]
>  [2] 
> [https://beam.apache.org/releases/javadoc/2.28.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withMaxNumRecords-long-]
>  [3] 
> [https://github.com/apache/beam/blob/v2.28.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L191-L193]
>  [4] 
> [https://github.com/apache/beam/blob/v2.28.0/sdks/jav

[jira] [Created] (BEAM-12570) SDK should close data output stream when it knows current bundle has failed

2021-07-01 Thread Boyuan Zhang (Jira)
Boyuan Zhang created BEAM-12570:
---

 Summary: SDK should close data output stream when it knows current 
bundle has failed
 Key: BEAM-12570
 URL: https://issues.apache.org/jira/browse/BEAM-12570
 Project: Beam
  Issue Type: Improvement
  Components: sdk-java-harness, sdk-py-harness
Reporter: Boyuan Zhang
Assignee: Luke Cwik


Currently, Java/Python SDK doesn't close data output steam when SDK throwing 
errors, which results in slow leak.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-12522) @InitialRestriction and @SplitRestriction should be able to access sideinputs as @ProcessElement

2021-06-21 Thread Boyuan Zhang (Jira)
Boyuan Zhang created BEAM-12522:
---

 Summary: @InitialRestriction and @SplitRestriction should be able 
to access sideinputs as @ProcessElement
 Key: BEAM-12522
 URL: https://issues.apache.org/jira/browse/BEAM-12522
 Project: Beam
  Issue Type: Improvement
  Components: sdk-java-core, sdk-java-harness
Reporter: Boyuan Zhang


@InitialRestriction and @SplitRestriction cannot access to sideinputs because 
of DoFnSignature checks and non-portable SplittableParDo doesn't copy 
sideinputs for expansions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-12494) Dataflow Kafka Job not triggering for external subnet

2021-06-21 Thread Boyuan Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-12494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17366794#comment-17366794
 ] 

Boyuan Zhang commented on BEAM-12494:
-

Are you able to ssh to your kafka cluster? Then you can run `./kafka-topics.sh 
--describe --zookeeper localhost:2181 --topic topic_name` to find your 
partitions.

> Dataflow Kafka Job not triggering for external subnet
> -
>
> Key: BEAM-12494
> URL: https://issues.apache.org/jira/browse/BEAM-12494
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-kafka
>Affects Versions: 2.28.0
> Environment: IntelliJ community version, Maven, Windows, Dataflow 
> version 2.28.0
>Reporter: Jasminder pal singh sehgal
>Priority: P2
> Fix For: Not applicable
>
> Attachments: CodeSnippet.JPG, LogsStreamingEngine.txt, 
> SuccessfulJobRun-KafkaIngestion.txt, TimeOutLogs_KafkaIngestion.txt, 
> image-2021-06-16-16-54-25-161.png, image-2021-06-16-16-55-57-509.png, 
> image-2021-06-20-22-20-24-363.png, image-2021-06-20-22-23-14-052.png, 
> image-2021-06-21-15-00-09-851.png
>
>
> Hello,
> Our team is facing an issue in streaming the Dataflow Kafka job through 
> IntelliJ that is hosted on a private subnet. 
> The hypothesis is that during Graph Construction time [0], the beam locally 
> tries to execute the code and check all the connections. In our case, we 
> don't have access to subnet through IntelliJ or through the Cloud console. We 
> do have access when compute engine instance is created within that subnet.
> We reached out to Google support and they suggested us to raise a defect with 
> u. The following code throws *time-out* error when we execute through 
> IntelliJ.
> {code:java}
> pipeline.apply("Read Kafka", KafkaIO.read()
> .withConsumerConfigUpdates(propertyBuilder)
> .withConsumerConfigUpdates(
> ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, 
> "my-consumer-group")
> )
> .withBootstrapServers(options.getBootstrapServers())
> .withTopics(topicsList)
> .withKeyDeserializer(StringDeserializer.class)
> .withValueDeserializer(StringDeserializer.class)
> .commitOffsetsInFinalize()
>// .withMaxNumRecords(5)
> )
> {code}
> But, if we uncomment
> {code:java}
> .withMaxNumRecords(){code}
> The code works perfectly and we are able to spin up dataflow job in the 
> desired subnet to ingest the Kafka stream.
> {code:java}
> pipeline.apply("Read Kafka", KafkaIO.read()
> .withConsumerConfigUpdates(propertyBuilder)
> .withConsumerConfigUpdates(
> ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, 
> "my-consumer-group")
> )
> .withBootstrapServers(options.getBootstrapServers())
> .withTopics(topicsList)
> .withKeyDeserializer(StringDeserializer.class)
> .withValueDeserializer(StringDeserializer.class)
> .commitOffsetsInFinalize()
> .withMaxNumRecords(5)
> )
> {code}
> The issue with the above Code is that the Dataflow will stop after ingesting 
> the given number of records and will act like Batch ingestion, instead of 
> Streaming, which we don't want.
> *Google support team hypothesis:*
> Current hypothesis is that the issue is happening in 
> `KakfaUnboundedSource.split()` [1] which fails due to unable to get Topic 
> information.
> The first point is, `withMaxNumRecords` is used for testing [2] and when 
> specified, the unbounded nature of the pipeline is converted into bounded 
> read in `BoundedReadFromUnboundedSource` [3] but without the 
> `withMaxNumRecords` the pipeline is still unbounded.
> When the pipeline is Bounded (when mentioning withMaxNumRecords) the 
> `split()` happens in Dataflow worker in `SplitFn` [4]. Since, it ran on 
> Dataflow, it did not have issue connecting to Kafka.
> But, when the pipeline is Unbounded (withMaxNumRecords commented out) the 
> `split()` is called when the pipeline is built locally at graph construction 
> phase [5][6] at which point it does not have access to Kafka.
>  
> [0]
>   
> [https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#pipeline-lifecycle:-from-pipeline-code-to-dataflow-job]
>   [1]
>  
> [https://github.com/apache/beam/blob/v2.28.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java#L57]
>  [2] 
> [https://beam.apache.org/releases/javadoc/2.28.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withMaxNumRecords-long-]
>  [3] 
> [https://github.com/apache/beam/blob/v2.28.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L191-L193]
>  [4] 
> [https://github.com/apache/beam/blob/v2.28.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java#L168-L169]
>  [5] 
> [https://github.com/ap

[jira] [Commented] (BEAM-12494) Dataflow Kafka Job not triggering for external subnet

2021-06-21 Thread Boyuan Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-12494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17366781#comment-17366781
 ] 

Boyuan Zhang commented on BEAM-12494:
-

Hi,

For the 400 bad job request, as I mentioned above, if you want to use runner 
v2, you need to ask dataflow customer support to add your project into 
allowlist.

For partition problem, do you have warning/error messages from your job?

> Dataflow Kafka Job not triggering for external subnet
> -
>
> Key: BEAM-12494
> URL: https://issues.apache.org/jira/browse/BEAM-12494
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-kafka
>Affects Versions: 2.28.0
> Environment: IntelliJ community version, Maven, Windows, Dataflow 
> version 2.28.0
>Reporter: Jasminder pal singh sehgal
>Priority: P2
> Fix For: Not applicable
>
> Attachments: CodeSnippet.JPG, LogsStreamingEngine.txt, 
> SuccessfulJobRun-KafkaIngestion.txt, TimeOutLogs_KafkaIngestion.txt, 
> image-2021-06-16-16-54-25-161.png, image-2021-06-16-16-55-57-509.png, 
> image-2021-06-20-22-20-24-363.png, image-2021-06-20-22-23-14-052.png
>
>
> Hello,
> Our team is facing an issue in streaming the Dataflow Kafka job through 
> IntelliJ that is hosted on a private subnet. 
> The hypothesis is that during Graph Construction time [0], the beam locally 
> tries to execute the code and check all the connections. In our case, we 
> don't have access to subnet through IntelliJ or through the Cloud console. We 
> do have access when compute engine instance is created within that subnet.
> We reached out to Google support and they suggested us to raise a defect with 
> u. The following code throws *time-out* error when we execute through 
> IntelliJ.
> {code:java}
> pipeline.apply("Read Kafka", KafkaIO.read()
> .withConsumerConfigUpdates(propertyBuilder)
> .withConsumerConfigUpdates(
> ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, 
> "my-consumer-group")
> )
> .withBootstrapServers(options.getBootstrapServers())
> .withTopics(topicsList)
> .withKeyDeserializer(StringDeserializer.class)
> .withValueDeserializer(StringDeserializer.class)
> .commitOffsetsInFinalize()
>// .withMaxNumRecords(5)
> )
> {code}
> But, if we uncomment
> {code:java}
> .withMaxNumRecords(){code}
> The code works perfectly and we are able to spin up dataflow job in the 
> desired subnet to ingest the Kafka stream.
> {code:java}
> pipeline.apply("Read Kafka", KafkaIO.read()
> .withConsumerConfigUpdates(propertyBuilder)
> .withConsumerConfigUpdates(
> ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, 
> "my-consumer-group")
> )
> .withBootstrapServers(options.getBootstrapServers())
> .withTopics(topicsList)
> .withKeyDeserializer(StringDeserializer.class)
> .withValueDeserializer(StringDeserializer.class)
> .commitOffsetsInFinalize()
> .withMaxNumRecords(5)
> )
> {code}
> The issue with the above Code is that the Dataflow will stop after ingesting 
> the given number of records and will act like Batch ingestion, instead of 
> Streaming, which we don't want.
> *Google support team hypothesis:*
> Current hypothesis is that the issue is happening in 
> `KakfaUnboundedSource.split()` [1] which fails due to unable to get Topic 
> information.
> The first point is, `withMaxNumRecords` is used for testing [2] and when 
> specified, the unbounded nature of the pipeline is converted into bounded 
> read in `BoundedReadFromUnboundedSource` [3] but without the 
> `withMaxNumRecords` the pipeline is still unbounded.
> When the pipeline is Bounded (when mentioning withMaxNumRecords) the 
> `split()` happens in Dataflow worker in `SplitFn` [4]. Since, it ran on 
> Dataflow, it did not have issue connecting to Kafka.
> But, when the pipeline is Unbounded (withMaxNumRecords commented out) the 
> `split()` is called when the pipeline is built locally at graph construction 
> phase [5][6] at which point it does not have access to Kafka.
>  
> [0]
>   
> [https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#pipeline-lifecycle:-from-pipeline-code-to-dataflow-job]
>   [1]
>  
> [https://github.com/apache/beam/blob/v2.28.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java#L57]
>  [2] 
> [https://beam.apache.org/releases/javadoc/2.28.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withMaxNumRecords-long-]
>  [3] 
> [https://github.com/apache/beam/blob/v2.28.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L191-L193]
>  [4] 
> [https://github.com/apache/beam/blob/v2.28.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java#L168-

[jira] [Comment Edited] (BEAM-12494) Dataflow Kafka Job not triggering for external subnet

2021-06-16 Thread Boyuan Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-12494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364519#comment-17364519
 ] 

Boyuan Zhang edited comment on BEAM-12494 at 6/16/21, 9:07 PM:
---

Hi Jasminder,

Would you like to share the pipeline code which uses `withTopicPartitions()` 
and the full console output when you get the failure?


was (Author: boyuanz):
Hi Jasminder,

Would you like to share the pipeline code which uses `withTopicPartitions()`?

> Dataflow Kafka Job not triggering for external subnet
> -
>
> Key: BEAM-12494
> URL: https://issues.apache.org/jira/browse/BEAM-12494
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-kafka
>Affects Versions: 2.28.0
> Environment: IntelliJ community version, Maven, Windows, Dataflow 
> version 2.28.0
>Reporter: Jasminder pal singh sehgal
>Priority: P2
> Fix For: Not applicable
>
> Attachments: CodeSnippet.JPG, SuccessfulJobRun-KafkaIngestion.txt, 
> TimeOutLogs_KafkaIngestion.txt, image-2021-06-16-16-54-25-161.png, 
> image-2021-06-16-16-55-57-509.png
>
>
> Hello,
> Our team is facing an issue in streaming the Dataflow Kafka job through 
> IntelliJ that is hosted on a private subnet. 
> The hypothesis is that during Graph Construction time [0], the beam locally 
> tries to execute the code and check all the connections. In our case, we 
> don't have access to subnet through IntelliJ or through the Cloud console. We 
> do have access when compute engine instance is created within that subnet.
> We reached out to Google support and they suggested us to raise a defect with 
> u. The following code throws *time-out* error when we execute through 
> IntelliJ.
> {code:java}
> pipeline.apply("Read Kafka", KafkaIO.read()
> .withConsumerConfigUpdates(propertyBuilder)
> .withConsumerConfigUpdates(
> ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, 
> "my-consumer-group")
> )
> .withBootstrapServers(options.getBootstrapServers())
> .withTopics(topicsList)
> .withKeyDeserializer(StringDeserializer.class)
> .withValueDeserializer(StringDeserializer.class)
> .commitOffsetsInFinalize()
>// .withMaxNumRecords(5)
> )
> {code}
> But, if we uncomment
> {code:java}
> .withMaxNumRecords(){code}
> The code works perfectly and we are able to spin up dataflow job in the 
> desired subnet to ingest the Kafka stream.
> {code:java}
> pipeline.apply("Read Kafka", KafkaIO.read()
> .withConsumerConfigUpdates(propertyBuilder)
> .withConsumerConfigUpdates(
> ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, 
> "my-consumer-group")
> )
> .withBootstrapServers(options.getBootstrapServers())
> .withTopics(topicsList)
> .withKeyDeserializer(StringDeserializer.class)
> .withValueDeserializer(StringDeserializer.class)
> .commitOffsetsInFinalize()
> .withMaxNumRecords(5)
> )
> {code}
> The issue with the above Code is that the Dataflow will stop after ingesting 
> the given number of records and will act like Batch ingestion, instead of 
> Streaming, which we don't want.
> *Google support team hypothesis:*
> Current hypothesis is that the issue is happening in 
> `KakfaUnboundedSource.split()` [1] which fails due to unable to get Topic 
> information.
> The first point is, `withMaxNumRecords` is used for testing [2] and when 
> specified, the unbounded nature of the pipeline is converted into bounded 
> read in `BoundedReadFromUnboundedSource` [3] but without the 
> `withMaxNumRecords` the pipeline is still unbounded.
> When the pipeline is Bounded (when mentioning withMaxNumRecords) the 
> `split()` happens in Dataflow worker in `SplitFn` [4]. Since, it ran on 
> Dataflow, it did not have issue connecting to Kafka.
> But, when the pipeline is Unbounded (withMaxNumRecords commented out) the 
> `split()` is called when the pipeline is built locally at graph construction 
> phase [5][6] at which point it does not have access to Kafka.
>  
> [0]
>   
> [https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#pipeline-lifecycle:-from-pipeline-code-to-dataflow-job]
>   [1]
>  
> [https://github.com/apache/beam/blob/v2.28.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java#L57]
>  [2] 
> [https://beam.apache.org/releases/javadoc/2.28.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withMaxNumRecords-long-]
>  [3] 
> [https://github.com/apache/beam/blob/v2.28.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L191-L193]
>  [4] 
> [https://github.com/apache/beam/blob/v2.28.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java#L168-L169]
>  [5] 
> [https

[jira] [Commented] (BEAM-12494) Dataflow Kafka Job not triggering for external subnet

2021-06-16 Thread Boyuan Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-12494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364519#comment-17364519
 ] 

Boyuan Zhang commented on BEAM-12494:
-

Hi Jasminder,

Would you like to share the pipeline code which uses `withTopicPartitions()`?

> Dataflow Kafka Job not triggering for external subnet
> -
>
> Key: BEAM-12494
> URL: https://issues.apache.org/jira/browse/BEAM-12494
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-kafka
>Affects Versions: 2.28.0
> Environment: IntelliJ community version, Maven, Windows, Dataflow 
> version 2.28.0
>Reporter: Jasminder pal singh sehgal
>Priority: P2
> Fix For: Not applicable
>
> Attachments: CodeSnippet.JPG, SuccessfulJobRun-KafkaIngestion.txt, 
> TimeOutLogs_KafkaIngestion.txt, image-2021-06-16-16-54-25-161.png, 
> image-2021-06-16-16-55-57-509.png
>
>
> Hello,
> Our team is facing an issue in streaming the Dataflow Kafka job through 
> IntelliJ that is hosted on a private subnet. 
> The hypothesis is that during Graph Construction time [0], the beam locally 
> tries to execute the code and check all the connections. In our case, we 
> don't have access to subnet through IntelliJ or through the Cloud console. We 
> do have access when compute engine instance is created within that subnet.
> We reached out to Google support and they suggested us to raise a defect with 
> u. The following code throws *time-out* error when we execute through 
> IntelliJ.
> {code:java}
> pipeline.apply("Read Kafka", KafkaIO.read()
> .withConsumerConfigUpdates(propertyBuilder)
> .withConsumerConfigUpdates(
> ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, 
> "my-consumer-group")
> )
> .withBootstrapServers(options.getBootstrapServers())
> .withTopics(topicsList)
> .withKeyDeserializer(StringDeserializer.class)
> .withValueDeserializer(StringDeserializer.class)
> .commitOffsetsInFinalize()
>// .withMaxNumRecords(5)
> )
> {code}
> But, if we uncomment
> {code:java}
> .withMaxNumRecords(){code}
> The code works perfectly and we are able to spin up dataflow job in the 
> desired subnet to ingest the Kafka stream.
> {code:java}
> pipeline.apply("Read Kafka", KafkaIO.read()
> .withConsumerConfigUpdates(propertyBuilder)
> .withConsumerConfigUpdates(
> ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, 
> "my-consumer-group")
> )
> .withBootstrapServers(options.getBootstrapServers())
> .withTopics(topicsList)
> .withKeyDeserializer(StringDeserializer.class)
> .withValueDeserializer(StringDeserializer.class)
> .commitOffsetsInFinalize()
> .withMaxNumRecords(5)
> )
> {code}
> The issue with the above Code is that the Dataflow will stop after ingesting 
> the given number of records and will act like Batch ingestion, instead of 
> Streaming, which we don't want.
> *Google support team hypothesis:*
> Current hypothesis is that the issue is happening in 
> `KakfaUnboundedSource.split()` [1] which fails due to unable to get Topic 
> information.
> The first point is, `withMaxNumRecords` is used for testing [2] and when 
> specified, the unbounded nature of the pipeline is converted into bounded 
> read in `BoundedReadFromUnboundedSource` [3] but without the 
> `withMaxNumRecords` the pipeline is still unbounded.
> When the pipeline is Bounded (when mentioning withMaxNumRecords) the 
> `split()` happens in Dataflow worker in `SplitFn` [4]. Since, it ran on 
> Dataflow, it did not have issue connecting to Kafka.
> But, when the pipeline is Unbounded (withMaxNumRecords commented out) the 
> `split()` is called when the pipeline is built locally at graph construction 
> phase [5][6] at which point it does not have access to Kafka.
>  
> [0]
>   
> [https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#pipeline-lifecycle:-from-pipeline-code-to-dataflow-job]
>   [1]
>  
> [https://github.com/apache/beam/blob/v2.28.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java#L57]
>  [2] 
> [https://beam.apache.org/releases/javadoc/2.28.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withMaxNumRecords-long-]
>  [3] 
> [https://github.com/apache/beam/blob/v2.28.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L191-L193]
>  [4] 
> [https://github.com/apache/beam/blob/v2.28.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java#L168-L169]
>  [5] 
> [https://github.com/apache/beam/blob/v2.28.0/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java#L87]
>  [6] 
> [https://cloud.google.com/dataflow/docs/guides/deploying-a-

[jira] [Updated] (BEAM-12475) When bundle processors are re-used, do not respond to splits for previous bundles.

2021-06-16 Thread Boyuan Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-12475?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyuan Zhang updated BEAM-12475:

Fix Version/s: 2.31.0

> When bundle processors are re-used, do not respond to splits for previous 
> bundles.
> --
>
> Key: BEAM-12475
> URL: https://issues.apache.org/jira/browse/BEAM-12475
> Project: Beam
>  Issue Type: Bug
>  Components: java-fn-execution
>Affects Versions: 2.30.0
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: P1
> Fix For: 2.31.0
>
>  Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> The error manifests in the following way:
> (thread1) Start processing bundle A
> (thread2) Receive request to split for bundle A
> (thread1) Finish bundle A
> (thread1) Reset bundle processor
> (thread1) Start processing bundle B
> (thread2) Process split for bundle A on bundle B
> Runners, such as Dataflow, that ensure the self-reported number of elements 
> processed by the SDK is equal to the number they expected to be processed 
> (taking into account the runner's understanding of all splits) can detect 
> this and reject A and B as invalid, but we should fix this race condition.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-12475) When bundle processors are re-used, do not respond to splits for previous bundles.

2021-06-16 Thread Boyuan Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-12475?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyuan Zhang updated BEAM-12475:

Resolution: Fixed
Status: Resolved  (was: Open)

> When bundle processors are re-used, do not respond to splits for previous 
> bundles.
> --
>
> Key: BEAM-12475
> URL: https://issues.apache.org/jira/browse/BEAM-12475
> Project: Beam
>  Issue Type: Bug
>  Components: java-fn-execution
>Affects Versions: 2.30.0
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: P1
> Fix For: 2.31.0
>
>  Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> The error manifests in the following way:
> (thread1) Start processing bundle A
> (thread2) Receive request to split for bundle A
> (thread1) Finish bundle A
> (thread1) Reset bundle processor
> (thread1) Start processing bundle B
> (thread2) Process split for bundle A on bundle B
> Runners, such as Dataflow, that ensure the self-reported number of elements 
> processed by the SDK is equal to the number they expected to be processed 
> (taking into account the runner's understanding of all splits) can detect 
> this and reject A and B as invalid, but we should fix this race condition.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-12494) Dataflow Kafka Job not triggering for external subnet

2021-06-16 Thread Boyuan Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-12494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364367#comment-17364367
 ] 

Boyuan Zhang commented on BEAM-12494:
-

And this issue will be gone if you are using runner v2 in 
Dataflow(--experiments=use_runner_v2 + asking support to add your project into 
allowlist).

> Dataflow Kafka Job not triggering for external subnet
> -
>
> Key: BEAM-12494
> URL: https://issues.apache.org/jira/browse/BEAM-12494
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-kafka
>Affects Versions: 2.28.0
> Environment: IntelliJ community version, Maven, Windows, Dataflow 
> version 2.28.0
>Reporter: Jasminder pal singh sehgal
>Priority: P2
> Fix For: Not applicable
>
> Attachments: CodeSnippet.JPG, SuccessfulJobRun-KafkaIngestion.txt, 
> TimeOutLogs_KafkaIngestion.txt
>
>
> Hello,
> Our team is facing an issue in streaming the Dataflow Kafka job through 
> IntelliJ that is hosted on a private subnet. 
> The hypothesis is that during Graph Construction time [0], the beam locally 
> tries to execute the code and check all the connections. In our case, we 
> don't have access to subnet through IntelliJ or through the Cloud console. We 
> do have access when compute engine instance is created within that subnet.
> We reached out to Google support and they suggested us to raise a defect with 
> u. The following code throws *time-out* error when we execute through 
> IntelliJ.
> {code:java}
> pipeline.apply("Read Kafka", KafkaIO.read()
> .withConsumerConfigUpdates(propertyBuilder)
> .withConsumerConfigUpdates(
> ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, 
> "my-consumer-group")
> )
> .withBootstrapServers(options.getBootstrapServers())
> .withTopics(topicsList)
> .withKeyDeserializer(StringDeserializer.class)
> .withValueDeserializer(StringDeserializer.class)
> .commitOffsetsInFinalize()
>// .withMaxNumRecords(5)
> )
> {code}
> But, if we uncomment
> {code:java}
> .withMaxNumRecords(){code}
> The code works perfectly and we are able to spin up dataflow job in the 
> desired subnet to ingest the Kafka stream.
> {code:java}
> pipeline.apply("Read Kafka", KafkaIO.read()
> .withConsumerConfigUpdates(propertyBuilder)
> .withConsumerConfigUpdates(
> ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, 
> "my-consumer-group")
> )
> .withBootstrapServers(options.getBootstrapServers())
> .withTopics(topicsList)
> .withKeyDeserializer(StringDeserializer.class)
> .withValueDeserializer(StringDeserializer.class)
> .commitOffsetsInFinalize()
> .withMaxNumRecords(5)
> )
> {code}
> The issue with the above Code is that the Dataflow will stop after ingesting 
> the given number of records and will act like Batch ingestion, instead of 
> Streaming, which we don't want.
> *Google support team hypothesis:*
> Current hypothesis is that the issue is happening in 
> `KakfaUnboundedSource.split()` [1] which fails due to unable to get Topic 
> information.
> The first point is, `withMaxNumRecords` is used for testing [2] and when 
> specified, the unbounded nature of the pipeline is converted into bounded 
> read in `BoundedReadFromUnboundedSource` [3] but without the 
> `withMaxNumRecords` the pipeline is still unbounded.
> When the pipeline is Bounded (when mentioning withMaxNumRecords) the 
> `split()` happens in Dataflow worker in `SplitFn` [4]. Since, it ran on 
> Dataflow, it did not have issue connecting to Kafka.
> But, when the pipeline is Unbounded (withMaxNumRecords commented out) the 
> `split()` is called when the pipeline is built locally at graph construction 
> phase [5][6] at which point it does not have access to Kafka.
>  
> [0]
>   
> [https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#pipeline-lifecycle:-from-pipeline-code-to-dataflow-job]
>   [1]
>  
> [https://github.com/apache/beam/blob/v2.28.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java#L57]
>  [2] 
> [https://beam.apache.org/releases/javadoc/2.28.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withMaxNumRecords-long-]
>  [3] 
> [https://github.com/apache/beam/blob/v2.28.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L191-L193]
>  [4] 
> [https://github.com/apache/beam/blob/v2.28.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java#L168-L169]
>  [5] 
> [https://github.com/apache/beam/blob/v2.28.0/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java#L87]
>  [6] 
> [https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#pipeli

[jira] [Comment Edited] (BEAM-12494) Dataflow Kafka Job not triggering for external subnet

2021-06-15 Thread Boyuan Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-12494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17363994#comment-17363994
 ] 

Boyuan Zhang edited comment on BEAM-12494 at 6/16/21, 12:39 AM:


Hi Jasminder,

Your investigation is correct and thanks for doing so much work : ) 

One workaround is that when constructing your KafkaIO, use 
[withTopicPartitions()|https://github.com/apache/beam/blob/36158013318fcfd7f18a432ccbc9b18b945a430c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L877-L882]
 instead of using withTopics().




was (Author: boyuanz):
Hi Jasminder,

You investigation is correct and thanks for doing so much work : ) 

One workaround is that when constructing your KafkaIO, use 
[withTopicPartitions()|https://github.com/apache/beam/blob/36158013318fcfd7f18a432ccbc9b18b945a430c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L877-L882]
 instead of using withTopics().



> Dataflow Kafka Job not triggering for external subnet
> -
>
> Key: BEAM-12494
> URL: https://issues.apache.org/jira/browse/BEAM-12494
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-kafka
>Affects Versions: 2.28.0
> Environment: IntelliJ community version, Maven, Windows, Dataflow 
> version 2.28.0
>Reporter: Jasminder pal singh sehgal
>Priority: P2
> Fix For: Not applicable
>
> Attachments: CodeSnippet.JPG, SuccessfulJobRun-KafkaIngestion.txt, 
> TimeOutLogs_KafkaIngestion.txt
>
>
> Hello,
> Our team is facing an issue in streaming the Dataflow Kafka job through 
> IntelliJ that is hosted on a private subnet. 
> The hypothesis is that during Graph Construction time [0], the beam locally 
> tries to execute the code and check all the connections. In our case, we 
> don't have access to subnet through IntelliJ or through the Cloud console. We 
> do have access when compute engine instance is created within that subnet.
> We reached out to Google support and they suggested us to raise a defect with 
> u. The following code throws *time-out* error when we execute through 
> IntelliJ.
> {code:java}
> pipeline.apply("Read Kafka", KafkaIO.read()
> .withConsumerConfigUpdates(propertyBuilder)
> .withConsumerConfigUpdates(
> ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, 
> "my-consumer-group")
> )
> .withBootstrapServers(options.getBootstrapServers())
> .withTopics(topicsList)
> .withKeyDeserializer(StringDeserializer.class)
> .withValueDeserializer(StringDeserializer.class)
> .commitOffsetsInFinalize()
>// .withMaxNumRecords(5)
> )
> {code}
> But, if we uncomment
> {code:java}
> .withMaxNumRecords(){code}
> The code works perfectly and we are able to spin up dataflow job in the 
> desired subnet to ingest the Kafka stream.
> {code:java}
> pipeline.apply("Read Kafka", KafkaIO.read()
> .withConsumerConfigUpdates(propertyBuilder)
> .withConsumerConfigUpdates(
> ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, 
> "my-consumer-group")
> )
> .withBootstrapServers(options.getBootstrapServers())
> .withTopics(topicsList)
> .withKeyDeserializer(StringDeserializer.class)
> .withValueDeserializer(StringDeserializer.class)
> .commitOffsetsInFinalize()
> .withMaxNumRecords(5)
> )
> {code}
> The issue with the above Code is that the Dataflow will stop after ingesting 
> the given number of records and will act like Batch ingestion, instead of 
> Streaming, which we don't want.
> *Google support team hypothesis:*
> Current hypothesis is that the issue is happening in 
> `KakfaUnboundedSource.split()` [1] which fails due to unable to get Topic 
> information.
> The first point is, `withMaxNumRecords` is used for testing [2] and when 
> specified, the unbounded nature of the pipeline is converted into bounded 
> read in `BoundedReadFromUnboundedSource` [3] but without the 
> `withMaxNumRecords` the pipeline is still unbounded.
> When the pipeline is Bounded (when mentioning withMaxNumRecords) the 
> `split()` happens in Dataflow worker in `SplitFn` [4]. Since, it ran on 
> Dataflow, it did not have issue connecting to Kafka.
> But, when the pipeline is Unbounded (withMaxNumRecords commented out) the 
> `split()` is called when the pipeline is built locally at graph construction 
> phase [5][6] at which point it does not have access to Kafka.
>  
> [0]
>   
> [https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#pipeline-lifecycle:-from-pipeline-code-to-dataflow-job]
>   [1]
>  
> [https://github.com/apache/beam/blob/v2.28.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java#L57]
>  [2] 
> [

[jira] [Commented] (BEAM-12494) Dataflow Kafka Job not triggering for external subnet

2021-06-15 Thread Boyuan Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-12494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17363994#comment-17363994
 ] 

Boyuan Zhang commented on BEAM-12494:
-

Hi Jasminder,

You investigation is correct and thanks for doing so much work : ) 

One workaround is that when constructing your KafkaIO, use 
[withTopicPartitions()|https://github.com/apache/beam/blob/36158013318fcfd7f18a432ccbc9b18b945a430c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L877-L882]
 instead of using withTopics().



> Dataflow Kafka Job not triggering for external subnet
> -
>
> Key: BEAM-12494
> URL: https://issues.apache.org/jira/browse/BEAM-12494
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-kafka
>Affects Versions: 2.28.0
> Environment: IntelliJ community version, Maven, Windows, Dataflow 
> version 2.28.0
>Reporter: Jasminder pal singh sehgal
>Priority: P2
> Fix For: Not applicable
>
> Attachments: CodeSnippet.JPG, SuccessfulJobRun-KafkaIngestion.txt, 
> TimeOutLogs_KafkaIngestion.txt
>
>
> Hello,
> Our team is facing an issue in streaming the Dataflow Kafka job through 
> IntelliJ that is hosted on a private subnet. 
> The hypothesis is that during Graph Construction time [0], the beam locally 
> tries to execute the code and check all the connections. In our case, we 
> don't have access to subnet through IntelliJ or through the Cloud console. We 
> do have access when compute engine instance is created within that subnet.
> We reached out to Google support and they suggested us to raise a defect with 
> u. The following code throws *time-out* error when we execute through 
> IntelliJ.
> {code:java}
> pipeline.apply("Read Kafka", KafkaIO.read()
> .withConsumerConfigUpdates(propertyBuilder)
> .withConsumerConfigUpdates(
> ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, 
> "my-consumer-group")
> )
> .withBootstrapServers(options.getBootstrapServers())
> .withTopics(topicsList)
> .withKeyDeserializer(StringDeserializer.class)
> .withValueDeserializer(StringDeserializer.class)
> .commitOffsetsInFinalize()
>// .withMaxNumRecords(5)
> )
> {code}
> But, if we uncomment
> {code:java}
> .withMaxNumRecords(){code}
> The code works perfectly and we are able to spin up dataflow job in the 
> desired subnet to ingest the Kafka stream.
> {code:java}
> pipeline.apply("Read Kafka", KafkaIO.read()
> .withConsumerConfigUpdates(propertyBuilder)
> .withConsumerConfigUpdates(
> ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, 
> "my-consumer-group")
> )
> .withBootstrapServers(options.getBootstrapServers())
> .withTopics(topicsList)
> .withKeyDeserializer(StringDeserializer.class)
> .withValueDeserializer(StringDeserializer.class)
> .commitOffsetsInFinalize()
> .withMaxNumRecords(5)
> )
> {code}
> The issue with the above Code is that the Dataflow will stop after ingesting 
> the given number of records and will act like Batch ingestion, instead of 
> Streaming, which we don't want.
> *Google support team hypothesis:*
> Current hypothesis is that the issue is happening in 
> `KakfaUnboundedSource.split()` [1] which fails due to unable to get Topic 
> information.
> The first point is, `withMaxNumRecords` is used for testing [2] and when 
> specified, the unbounded nature of the pipeline is converted into bounded 
> read in `BoundedReadFromUnboundedSource` [3] but without the 
> `withMaxNumRecords` the pipeline is still unbounded.
> When the pipeline is Bounded (when mentioning withMaxNumRecords) the 
> `split()` happens in Dataflow worker in `SplitFn` [4]. Since, it ran on 
> Dataflow, it did not have issue connecting to Kafka.
> But, when the pipeline is Unbounded (withMaxNumRecords commented out) the 
> `split()` is called when the pipeline is built locally at graph construction 
> phase [5][6] at which point it does not have access to Kafka.
>  
> [0]
>   
> [https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#pipeline-lifecycle:-from-pipeline-code-to-dataflow-job]
>   [1]
>  
> [https://github.com/apache/beam/blob/v2.28.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java#L57]
>  [2] 
> [https://beam.apache.org/releases/javadoc/2.28.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withMaxNumRecords-long-]
>  [3] 
> [https://github.com/apache/beam/blob/v2.28.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L191-L193]
>  [4] 
> [https://github.com/apache/beam/blob/v2.28.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java#L168-L169]
>  [5] 
> [https://github.com/apache/beam

[jira] [Updated] (BEAM-12459) Watch does not properly advance the watermark by default

2021-06-08 Thread Boyuan Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-12459?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyuan Zhang updated BEAM-12459:

Priority: P2  (was: P1)

> Watch does not properly advance the watermark by default
> 
>
> Key: BEAM-12459
> URL: https://issues.apache.org/jira/browse/BEAM-12459
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Reporter: Daniel Collins
>Assignee: Luke Cwik
>Priority: P2
>
> Assigning to Luke who has made substantial changes to this class most 
> recently.
>  
> It appears after investigation that when using Watch in the default 
> configuration, the global watermark is not advanced properly, even though 
> Watch documentation claims it should be 
> ([https://github.com/apache/beam/blob/8922c1cf23c093262af9e4570d69947a9a749506/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L118]).
>  The below is example code using Watch that will not advance the watermark, 
> but should:
> ```
>  Watch.growthOf(
>  new PollFn() {
>      @Override
>      public PollResult apply(TopicPath element, Context c)
> {         return PollResult.incomplete(Instant.now(), List.of(0));     }
> })
>  .withPollInterval(...)
>  .withTerminationPerInput(Watch.Growth.never());
>  ```
> I've been advised that changing the return statement to `return 
> PollResult.incomplete(Instant.now(), 
> List.of(0)).withWatermark(Instant.now());` will resolve this issue, but the 
> `withWatermark` function is commented as "By default, the watermark for a 
> particular input is computed from a poll result as "earliest timestamp of new 
> elements in this poll result". It can also be set explicitly via \{@link 
> Growth.PollResult#withWatermark} if the \{@link Growth.PollFn} can provide a 
> more optimistic estimate.". The goal is not to provide a more optimistic 
> estimate, but to allow any advancement at all. If withWatermark is needed to 
> close windows, this function should be required (or at least more prominent 
> in all example code).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-12458) Using SubscriberOptions.setPartitions results in pipeline construction error

2021-06-07 Thread Boyuan Zhang (Jira)
Boyuan Zhang created BEAM-12458:
---

 Summary: Using SubscriberOptions.setPartitions results in pipeline 
construction error
 Key: BEAM-12458
 URL: https://issues.apache.org/jira/browse/BEAM-12458
 Project: Beam
  Issue Type: Bug
  Components: io-java-gcp
Affects Versions: 2.30.0
Reporter: Boyuan Zhang
Assignee: Daniel Collins


With pipeline like:
```
SubscriberOptions subscriberOpitons =
SubscriberOptions.newBuilder()
.setSubscriptionPath(SubscriptionPath.parse(""))
.setPartitions(ImmutableSet.of(Partition.of(0))).build();

pipeline
.apply("Create elements", PubsubLiteIO.read(subscriberOpitons))
```
it will fail at pipeline construction time:
```
java.lang.IllegalArgumentException: Unable to infer a coder and no Coder was 
specified. Please set a coder by invoking Create.withCoder() explicitly  or a 
schema by invoking Create.withSchema().
at org.apache.beam.sdk.transforms.Create$Values.expand(Create.java:363)
at org.apache.beam.sdk.transforms.Create$Values.expand(Create.java:277)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:548)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:482)
at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:44)
at 
org.apache.beam.sdk.io.gcp.pubsublite.SubscribeTransform.expand(SubscribeTransform.java:126)
at 
org.apache.beam.sdk.io.gcp.pubsublite.SubscribeTransform.expand(SubscribeTransform.java:46)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:548)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:499)
at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:56)
at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:192)
at 
org.apache.beam.sdk.io.gcp.pubsub.PubsubReadIT.testReadPubsubMessageId(PubsubReadIT.java:72)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:322)
at 
org.apache.beam.sdk.io.gcp.pubsub.TestPubsubSignal$1.evaluate(TestPubsubSignal.java:130)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
at 
org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
at 
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)

[jira] [Commented] (BEAM-8218) Implement Apache PulsarIO

2021-06-04 Thread Boyuan Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-8218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17357542#comment-17357542
 ] 

Boyuan Zhang commented on BEAM-8218:


I don't think there is anyone working on this right now. I pinged Max from this 
JIRA before to see whether he is still on it.

> Implement Apache PulsarIO
> -
>
> Key: BEAM-8218
> URL: https://issues.apache.org/jira/browse/BEAM-8218
> Project: Beam
>  Issue Type: Task
>  Components: io-ideas
>Reporter: Alex Van Boxel
>Priority: P3
>
> Apache Pulsar is starting to gain popularity. Having a native Beam PulsarIO 
> could be beneficial.
> [https://pulsar.apache.org/|https://pulsar.apache.org/en/]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-12384) Read.Bounded typeDescriptor is lost after move to SDF

2021-05-21 Thread Boyuan Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-12384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17349403#comment-17349403
 ] 

Boyuan Zhang commented on BEAM-12384:
-

Hi Ismael, would you like to elaborate more on what the failures look like and 
what kind of problem here?

> Read.Bounded typeDescriptor is lost after move to SDF
> -
>
> Key: BEAM-12384
> URL: https://issues.apache.org/jira/browse/BEAM-12384
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Ismaël Mejía
>Assignee: Ismaël Mejía
>Priority: P2
> Fix For: 2.31.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> While debugging a downstream transform after Read I notice that the 
> typeDescriptor information was not set up correctly on the Read transform



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-12353) org.apache.beam.sdk.transforms.GroupByKeyTest$WindowTests.testRewindowWithTimestampCombiner is failng on Dataflow runner v2

2021-05-20 Thread Boyuan Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-12353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17348613#comment-17348613
 ] 

Boyuan Zhang commented on BEAM-12353:
-

No, the Dataflow validates runner suite doesn't include runner v2. We have a 
separate target to reduce the test time. Would you like to elaborate more on 
what the test is for?

> org.apache.beam.sdk.transforms.GroupByKeyTest$WindowTests.testRewindowWithTimestampCombiner
>  is failng on Dataflow runner v2
> ---
>
> Key: BEAM-12353
> URL: https://issues.apache.org/jira/browse/BEAM-12353
> Project: Beam
>  Issue Type: Bug
>  Components: runner-dataflow
>Reporter: Boyuan Zhang
>Priority: P2
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> WindowTests.testRewindowWithTimestampCombiner is introduced recently by 
> https://github.com/apache/beam/pull/14667. This test passes on Dataflow 
> runner v2 both batch and streaming, but fails on Dataflow runner v2.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-12358) PR14802 breaks beam_PostCommit_Java_ValidatesRunner_Samza

2021-05-18 Thread Boyuan Zhang (Jira)
Boyuan Zhang created BEAM-12358:
---

 Summary: PR14802 breaks beam_PostCommit_Java_ValidatesRunner_Samza
 Key: BEAM-12358
 URL: https://issues.apache.org/jira/browse/BEAM-12358
 Project: Beam
  Issue Type: Test
  Components: runner-samza, test-failures
Reporter: Boyuan Zhang


https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/
https://github.com/apache/beam/pull/14802



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-12353) org.apache.beam.sdk.transforms.GroupByKeyTest$WindowTests.testRewindowWithTimestampCombiner is failng on Dataflow runner v2

2021-05-18 Thread Boyuan Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-12353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17347093#comment-17347093
 ] 

Boyuan Zhang commented on BEAM-12353:
-

This test is failed consistently on runner v2 batch but flaky on runner v2 
streaming.

> org.apache.beam.sdk.transforms.GroupByKeyTest$WindowTests.testRewindowWithTimestampCombiner
>  is failng on Dataflow runner v2
> ---
>
> Key: BEAM-12353
> URL: https://issues.apache.org/jira/browse/BEAM-12353
> Project: Beam
>  Issue Type: Bug
>  Components: runner-dataflow
>Reporter: Boyuan Zhang
>Priority: P2
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> WindowTests.testRewindowWithTimestampCombiner is introduced recently by 
> https://github.com/apache/beam/pull/14667. This test passes on Dataflow 
> runner v2 both batch and streaming, but fails on Dataflow runner v2.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-12354) :runners:direct-java:runMobileGamingJavaDirect fails on beam_PostRelease_NightlySnapshot for several times

2021-05-17 Thread Boyuan Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-12354?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyuan Zhang updated BEAM-12354:

Description: 
https://ci-beam.apache.org/job/beam_PostRelease_NightlySnapshot is failing on 
:runners:direct-java:runMobileGamingJavaDirect. Typical error message is:

May 17, 2021 11:21:20 AM 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl 
createTable
INFO: Trying to create BigQuery table: 
apache-beam-testing:beam_postrelease_mobile_gaming.leaderboard_DirectRunner_user
 
[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.6.0:java 
(default-cli) on project word-count-beam: An exception occured while executing 
the Java class. java.lang.RuntimeException: java.io.IOException: Premature EOF 
-> [Help 1]

There are also some SEVERE log saying that  "ManagedChannelImpl{logId=39, 
target=bigquerystorage.googleapis.com:443} was not shutdown properly" but it 
seems like this doesn't affect execution immediately, which might be a red 
herring.

Build scan: 
https://scans.gradle.com/s/a3oc76o2diw72/console-log?anchor=15629&task=:runners:direct-java:runMobileGamingJavaDirect

  was:
https://ci-beam.apache.org/job/beam_PostRelease_NightlySnapshot is failing on 
:runners:direct-java:runMobileGamingJavaDirect. Typical error message is:

May 17, 2021 11:21:20 AM 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl 
createTable
INFO: Trying to create BigQuery table: 
apache-beam-testing:beam_postrelease_mobile_gaming.leaderboard_DirectRunner_user
 
[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.6.0:java 
(default-cli) on project word-count-beam: An exception occured while executing 
the Java class. java.lang.RuntimeException: java.io.IOException: Premature EOF 
-> [Help 1]

There are also some SEVERE log saying that  "ManagedChannelImpl{logId=39, 
target=bigquerystorage.googleapis.com:443} was not shutdown properly" but it 
seems like this doesn't affect execution immediately, which might be a red 
herring.


> :runners:direct-java:runMobileGamingJavaDirect fails on  
> beam_PostRelease_NightlySnapshot for several times
> ---
>
> Key: BEAM-12354
> URL: https://issues.apache.org/jira/browse/BEAM-12354
> Project: Beam
>  Issue Type: Test
>  Components: test-failures
>Reporter: Boyuan Zhang
>Priority: P2
>
> https://ci-beam.apache.org/job/beam_PostRelease_NightlySnapshot is failing on 
> :runners:direct-java:runMobileGamingJavaDirect. Typical error message is:
> May 17, 2021 11:21:20 AM 
> org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl 
> createTable  
> INFO: Trying to create BigQuery table: 
> apache-beam-testing:beam_postrelease_mobile_gaming.leaderboard_DirectRunner_user
>
> [ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.6.0:java 
> (default-cli) on project word-count-beam: An exception occured while 
> executing the Java class. java.lang.RuntimeException: java.io.IOException: 
> Premature EOF -> [Help 1]
> There are also some SEVERE log saying that  "ManagedChannelImpl{logId=39, 
> target=bigquerystorage.googleapis.com:443} was not shutdown properly" but it 
> seems like this doesn't affect execution immediately, which might be a red 
> herring.
> Build scan: 
> https://scans.gradle.com/s/a3oc76o2diw72/console-log?anchor=15629&task=:runners:direct-java:runMobileGamingJavaDirect



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-12354) :runners:direct-java:runMobileGamingJavaDirect fails on beam_PostRelease_NightlySnapshot for several times

2021-05-17 Thread Boyuan Zhang (Jira)
Boyuan Zhang created BEAM-12354:
---

 Summary: :runners:direct-java:runMobileGamingJavaDirect fails on  
beam_PostRelease_NightlySnapshot for several times
 Key: BEAM-12354
 URL: https://issues.apache.org/jira/browse/BEAM-12354
 Project: Beam
  Issue Type: Test
  Components: test-failures
Reporter: Boyuan Zhang


https://ci-beam.apache.org/job/beam_PostRelease_NightlySnapshot is failing on 
:runners:direct-java:runMobileGamingJavaDirect. Typical error message is:

May 17, 2021 11:21:20 AM 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl 
createTable
INFO: Trying to create BigQuery table: 
apache-beam-testing:beam_postrelease_mobile_gaming.leaderboard_DirectRunner_user
 
[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.6.0:java 
(default-cli) on project word-count-beam: An exception occured while executing 
the Java class. java.lang.RuntimeException: java.io.IOException: Premature EOF 
-> [Help 1]

There are also some SEVERE log saying that  "ManagedChannelImpl{logId=39, 
target=bigquerystorage.googleapis.com:443} was not shutdown properly" but it 
seems like this doesn't affect execution immediately, which might be a red 
herring.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-12353) org.apache.beam.sdk.transforms.GroupByKeyTest$WindowTests.testRewindowWithTimestampCombiner is failng on Dataflow runner v2

2021-05-17 Thread Boyuan Zhang (Jira)
Boyuan Zhang created BEAM-12353:
---

 Summary: 
org.apache.beam.sdk.transforms.GroupByKeyTest$WindowTests.testRewindowWithTimestampCombiner
 is failng on Dataflow runner v2
 Key: BEAM-12353
 URL: https://issues.apache.org/jira/browse/BEAM-12353
 Project: Beam
  Issue Type: Bug
  Components: runner-dataflow
Reporter: Boyuan Zhang


WindowTests.testRewindowWithTimestampCombiner is introduced recently by 
https://github.com/apache/beam/pull/14667. This test passes on Dataflow runner 
v2 both batch and streaming, but fails on Dataflow runner v2.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-12352) GcsIOIntegrationTest.test_copy_rewrite_token and GcsIOIntegrationTest.test_copy_batch_rewrite_token starts to fail from 05/15

2021-05-17 Thread Boyuan Zhang (Jira)
Boyuan Zhang created BEAM-12352:
---

 Summary: GcsIOIntegrationTest.test_copy_rewrite_token and 
GcsIOIntegrationTest.test_copy_batch_rewrite_token starts to fail from 05/15
 Key: BEAM-12352
 URL: https://issues.apache.org/jira/browse/BEAM-12352
 Project: Beam
  Issue Type: Improvement
  Components: test-failures
Reporter: Boyuan Zhang
Assignee: Udi Meiri


apache_beam.io.gcp.gcsio_integration_test.GcsIOIntegrationTest.test_copy_batch_rewrite_token
 and 
apache_beam.io.gcp.gcsio_integration_test.GcsIOIntegrationTest.test_copy_rewrite_token
 are failing in python postcommit suite.

The first failure is 
https://ci-beam.apache.org/job/beam_PostCommit_Python36/3890/ and I didn't find 
recent related changes which could lead to this failure.

Udi, since you created the test, would you like to take a look and find the 
root case?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-10861) Adds URNs and payloads to PubSub transforms

2021-05-17 Thread Boyuan Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-10861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyuan Zhang updated BEAM-10861:

Fix Version/s: 2.29.0
   Resolution: Fixed
   Status: Resolved  (was: Open)

> Adds URNs and payloads to PubSub transforms
> ---
>
> Key: BEAM-10861
> URL: https://issues.apache.org/jira/browse/BEAM-10861
> Project: Beam
>  Issue Type: New Feature
>  Components: cross-language, runner-dataflow, sdk-py-core
>Reporter: Chamikara Madhusanka Jayalath
>Priority: P1
> Fix For: 2.29.0
>
>  Time Spent: 35h 40m
>  Remaining Estimate: 0h
>
> This is needed to allow runners to override portable definition of transforms.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-12336) JmsIO should allow using custom timestamp policy to track watermark

2021-05-13 Thread Boyuan Zhang (Jira)
Boyuan Zhang created BEAM-12336:
---

 Summary: JmsIO should allow using custom timestamp policy to track 
watermark
 Key: BEAM-12336
 URL: https://issues.apache.org/jira/browse/BEAM-12336
 Project: Beam
  Issue Type: Improvement
  Components: io-java-jms
Reporter: Boyuan Zhang


Similar to 
[CustomTimestampPolicyWithLimitedDelay|https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelay.java]
 in KafkaIO, JmsIO can consider adding the same functionality to allow 
configuring timestamp policy.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-12335) Apply basic fusion to Java DirectRunner to avoid keeping all intermittent results in memory

2021-05-13 Thread Boyuan Zhang (Jira)
Boyuan Zhang created BEAM-12335:
---

 Summary: Apply basic fusion to Java DirectRunner to avoid keeping 
all intermittent results in memory 
 Key: BEAM-12335
 URL: https://issues.apache.org/jira/browse/BEAM-12335
 Project: Beam
  Issue Type: Improvement
  Components: runner-direct
Reporter: Boyuan Zhang


Current java direct runner doesn't fuse transforms into steps. Instead, it 
almost executes each transform one by one. It results in memory pressure when 
any transform is high-fanout.

We already have a simple fusion logic in Java 
SDK(https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuser.java).
 Work remaining here might be:
* Apply such fusion into DirectRunner
* Change the DirectRunner to be able run the fused steps.

I understand that DirectRunner doesn't expect processing large volume data and 
changing DirectRunner execution might be a fair amount of work.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-10670) Make non-portable Splittable DoFn the only option when executing Java "Read" transforms

2021-05-12 Thread Boyuan Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-10670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17343398#comment-17343398
 ] 

Boyuan Zhang commented on BEAM-10670:
-

I can help this. I believe at this moment, the only runner going with SDF is 
Dataflow one.

> Make non-portable Splittable DoFn the only option when executing Java "Read" 
> transforms
> ---
>
> Key: BEAM-10670
> URL: https://issues.apache.org/jira/browse/BEAM-10670
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Luke Cwik
>Assignee: Ismaël Mejía
>Priority: P1
>  Labels: Clarified
> Fix For: 2.30.0
>
>  Time Spent: 40.5h
>  Remaining Estimate: 0h
>
> All runners seem to be capable of migrating to splittable DoFn for 
> non-portable execution except for Dataflow runner v1 which will internalize 
> the current primitive read implementation that is shared across runner 
> implementations.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-12287) beam_PerformanceTests_Kafka_IO failing due to :sdks:java:container:pullLicenses failure

2021-05-11 Thread Boyuan Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-12287?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17342931#comment-17342931
 ] 

Boyuan Zhang commented on BEAM-12287:
-

I can confirm that https://github.com/apache/beam/pull/14669 causes this 
failure, though I have no idea why KafkaIO suite is the only suite failing on 
this: 
https://ci-beam.apache.org/job/beam_PerformanceTests_Kafka_IO/2282/
https://github.com/apache/beam/pull/14787

> beam_PerformanceTests_Kafka_IO failing due to 
> :sdks:java:container:pullLicenses failure
> ---
>
> Key: BEAM-12287
> URL: https://issues.apache.org/jira/browse/BEAM-12287
> Project: Beam
>  Issue Type: Bug
>  Components: test-failures
>Reporter: Brian Hulette
>Assignee: Chamikara Madhusanka Jayalath
>Priority: P1
>
> {code}
> 05:52:37 ERROR:root:['ST4-4.3']
> 05:52:37 ERROR:root: Licenses were 
> not able to be pulled automatically for some dependencies. Please search 
> source code of the dependencies on the internet and add "license" and 
> "notice" (if available) field to 
> /home/jenkins/jenkins-slave/workspace/beam_PerformanceTests_Kafka_IO/src/sdks/java/container/license_scripts/dep_urls_java.yaml
>  for each missing license. Dependency List: [ST4-4.3]
> 05:52:37 INFO:root:pull_licenses_java.py failed. It took 3.862233 seconds 
> with 16 threads.
> 05:52:37 Traceback (most recent call last):
> 05:52:37   File 
> "/home/jenkins/jenkins-slave/workspace/beam_PerformanceTests_Kafka_IO/src/sdks/java/container/license_scripts/pull_licenses_java.py",
>  line 315, in 
> 05:52:37 error_msg)
> 05:52:37 RuntimeError: ('2 error(s) occurred.', 
> [' Licenses were not able to be 
> pulled automatically for some dependencies. Please search source code of the 
> dependencies on the internet and add "license" and "notice" (if available) 
> field to 
> /home/jenkins/jenkins-slave/workspace/beam_PerformanceTests_Kafka_IO/src/sdks/java/container/license_scripts/dep_urls_java.yaml
>  for each missing license. Dependency List: [ST4-4.3]', 
> ' License type of some dependencies 
> were not identified. The license type is used to decide whether the source 
> code of the dependency should be pulled or not. Please add "type" field to 
> /home/jenkins/jenkins-slave/workspace/beam_PerformanceTests_Kafka_IO/src/sdks/java/container/license_scripts/dep_urls_java.yaml
>  for each dependency. Dependency List: [ST4-4.3]'])
> 05:52:37 Watching 1709 directories to track changes
> 05:52:37 Watching 2065 directories to track changes
> 05:52:37 :sdks:java:container:pullLicenses (Thread[Execution  for ':' 
> Thread 7,5,main]) completed. Took 14.202 secs.
> {code}
> Example failure: 
> https://ci-beam.apache.org/job/beam_PerformanceTests_Kafka_IO/2264



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-12114) Eliminate beam_fn_api from KafkaIO expansion

2021-05-11 Thread Boyuan Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-12114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyuan Zhang updated BEAM-12114:

Fix Version/s: 2.30.0
   Resolution: Fixed
   Status: Resolved  (was: Open)

> Eliminate beam_fn_api from KafkaIO expansion
> 
>
> Key: BEAM-12114
> URL: https://issues.apache.org/jira/browse/BEAM-12114
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-kafka
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: P2
> Fix For: 2.30.0
>
>  Time Spent: 4.5h
>  Remaining Estimate: 0h
>
> We are going to use Splittable DoFn expansion by default for KafkaIO without 
> looking into beam_fn_api flag. But KafkaIO provides overrides for runners 
> that decides to not use such expansion. Pipieline author can also use 
> beam_fn_api_use_deprecated_read to swich to the old expansion.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-11998) Portable runners should be able to issue checkpoints to Splittable DoFn

2021-05-10 Thread Boyuan Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-11998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17342150#comment-17342150
 ] 

Boyuan Zhang commented on BEAM-11998:
-

Hi [~zhao1116], I'm happy to help you on this. Let me know if you have any 
questions or anything in your mind that you think I can help.

> Portable runners should be able to issue checkpoints to Splittable DoFn
> ---
>
> Key: BEAM-11998
> URL: https://issues.apache.org/jira/browse/BEAM-11998
> Project: Beam
>  Issue Type: New Feature
>  Components: cross-language, runner-flink, runner-spark
>Reporter: Boyuan Zhang
>Priority: P2
> Attachments: read.png
>
>
> To execute unbounded Splittable DoFn over fnapi in streaming mode properly, 
> portable runners should issue split(ProcessBundleSplitRequest with 
> fraction_of_remainder > 0) or simply checkpoint(ProcessBundleSplitRequest 
> with fraction_of_remainder == 0) to SDK regularly to make current bundle 
> finished processing instead of running forever.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-6466) KafkaIO doesn't commit offsets while being used as bounded source

2021-05-10 Thread Boyuan Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-6466?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17342133#comment-17342133
 ] 

Boyuan Zhang commented on BEAM-6466:


The unbounded-as-bounded-read goes different expansion comparing to normal 
unbounded-read. In short, the unbounded-as-bounded-read performs more like a 
DoFn(https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java#L94-L121),
 where there is no finalizeCheckpoint contract. So instead of a bug, this looks 
more like a FR on unbounded-as-bounded-read to me.

> KafkaIO doesn't commit offsets while being used as bounded source
> -
>
> Key: BEAM-6466
> URL: https://issues.apache.org/jira/browse/BEAM-6466
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-kafka
>Affects Versions: 2.9.0
>Reporter: Alexey Romanenko
>Priority: P1
>
> While using KafkaIO as bounded source (with {{withMaxReadTime()}} or 
> {{withMaxNumRecords()}}) it seems doesn't commit offsets all the time.
> See the details in [the 
> discussion|https://lists.apache.org/thread.html/bcec8a1fb166029a4adf3f3491c407d49843406020b20f203ec3c2d2@%3Cuser.beam.apache.org%3E]
>  on user@list.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-11828) JmsIO is not acknowledging messages correctly

2021-04-27 Thread Boyuan Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-11828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17333402#comment-17333402
 ] 

Boyuan Zhang commented on BEAM-11828:
-

Talking to one customer regarding to the same problem, the customer found that 
there is a high chance that the reader.close() is called before 
CheckpointMark.finalizaCheckpointMark() is called, but  
CheckpointMark.finalizaCheckpointMark() requires the reader still holds the 
session. I think it's intended that the reader is closed before the runner 
tries to finalize the checkpointMark. It indicates a implementation problem 
inside JmsIO.

> JmsIO is not acknowledging messages correctly
> -
>
> Key: BEAM-11828
> URL: https://issues.apache.org/jira/browse/BEAM-11828
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-jms
>Reporter: Seifeddine Gamoudi
>Priority: P1
>
> Hello,
> We noticed that messages consumption via JmsIO does not acknowledge messages 
> properly.
> Some messages are never acknowledged which results in data loss.
> So when do messages get acknowledged by JmsIO?
> How do We make sure all messages are being acknowledged?
>  
> Best regards,
> Seif



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-11998) Portable runners should be able to issue checkpoints to Splittable DoFn

2021-04-27 Thread Boyuan Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-11998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1750#comment-1750
 ] 

Boyuan Zhang commented on BEAM-11998:
-

You can always use --experiments=use_deprecated_read as a wrokaround.

> Portable runners should be able to issue checkpoints to Splittable DoFn
> ---
>
> Key: BEAM-11998
> URL: https://issues.apache.org/jira/browse/BEAM-11998
> Project: Beam
>  Issue Type: New Feature
>  Components: cross-language, runner-flink, runner-spark
>Reporter: Boyuan Zhang
>Priority: P2
>
> To execute unbounded Splittable DoFn over fnapi in streaming mode properly, 
> portable runners should issue split(ProcessBundleSplitRequest with 
> fraction_of_remainder > 0) or simply checkpoint(ProcessBundleSplitRequest 
> with fraction_of_remainder == 0) to SDK regularly to make current bundle 
> finished processing instead of running forever.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-12213) Dataflow should always create v1b3 Steps in runner_v1 flavor

2021-04-22 Thread Boyuan Zhang (Jira)
Boyuan Zhang created BEAM-12213:
---

 Summary: Dataflow should always create v1b3 Steps in runner_v1 
flavor
 Key: BEAM-12213
 URL: https://issues.apache.org/jira/browse/BEAM-12213
 Project: Beam
  Issue Type: Task
  Components: runner-dataflow
Reporter: Boyuan Zhang






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-12196) Apache Beam Kafka Source Connector Idle Partition Issue with “CustomTimeStampPolicyWithLimitedDelay”

2021-04-20 Thread Boyuan Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-12196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17326258#comment-17326258
 ] 

Boyuan Zhang commented on BEAM-12196:
-

Thanks, Jay.

And which API are you talking about?

> Apache Beam Kafka Source Connector Idle Partition Issue with 
> “CustomTimeStampPolicyWithLimitedDelay”
> 
>
> Key: BEAM-12196
> URL: https://issues.apache.org/jira/browse/BEAM-12196
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-kafka
>Affects Versions: 2.28.0
> Environment: Kubernetes, Lyft Flink-operator,
>Reporter: Jay Ghiya
>Priority: P2
>
> Source is kafka for our beam pipeline. Apache beam's kafka IO connector 
> supports moving of watermark(in case of flink runner) even if any partition 
> is idle. The applications who would want to process packets based on the 
> timestamp of the packet which is included in the payload would want to use 
> "CustomTimestampPolicyWithLimitedDelay". We use FIXED WINDOWS for a minute 
> for aggregation which is dependent on notion of time. So if time does not 
> advance properly aggregation function is not called and data is missed.
> This API has functionality issues. So when the application is initialized , 
> let us just for example Topic a is used as a source with three partitions. 
> These steps were taken to reproduce the issue:
> Pump data to only one partition with a frequency with of any x seconds and 
> observation is aggregation function is not called even after several minutes.
> Now pump data to all partitions and observation is aggregation function is 
> called at end of minute as expected.
> Now pump data to only one partition and that too not till end of minute just 
> before that so that we can generate a idle partition scenario and observation 
> is it works as expected NOW.
> So the sort of summary is there is a initialization issue with this api where 
> it does not advance time but after step 2 it stabilizes and works as expected.
> This is easily reproducible and would request apache beam to fix this.
> As of now the temp fix we have gone is with LogAppendTime which works 
> flawlessly but we do not want to process packets on broker time due to 
> various application needs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-12196) Apache Beam Kafka Source Connector Idle Partition Issue with “CustomTimeStampPolicyWithLimitedDelay”

2021-04-20 Thread Boyuan Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-12196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17326054#comment-17326054
 ] 

Boyuan Zhang commented on BEAM-12196:
-

Hi Jay,

Can you try --experiments=use_deprecated_read to see whether it helps you to 
get rid of such issue? 

In short it seems like the issue is that idle partitions will hold back the 
watermark.

> Apache Beam Kafka Source Connector Idle Partition Issue with 
> “CustomTimeStampPolicyWithLimitedDelay”
> 
>
> Key: BEAM-12196
> URL: https://issues.apache.org/jira/browse/BEAM-12196
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-kafka
>Affects Versions: 2.28.0
> Environment: Kubernetes, Lyft Flink-operator,
>Reporter: Jay Ghiya
>Priority: P2
>
> Source is kafka for our beam pipeline. Apache beam's kafka IO connector 
> supports moving of watermark(in case of flink runner) even if any partition 
> is idle. The applications who would want to process packets based on the 
> timestamp of the packet which is included in the payload would want to use 
> "CustomTimestampPolicyWithLimitedDelay". We use FIXED WINDOWS for a minute 
> for aggregation which is dependent on notion of time. So if time does not 
> advance properly aggregation function is not called and data is missed.
> This API has functionality issues. So when the application is initialized , 
> let us just for example Topic a is used as a source with three partitions. 
> These steps were taken to reproduce the issue:
> Pump data to only one partition with a frequency with of any x seconds and 
> observation is aggregation function is not called even after several minutes.
> Now pump data to all partitions and observation is aggregation function is 
> called at end of minute as expected.
> Now pump data to only one partition and that too not till end of minute just 
> before that so that we can generate a idle partition scenario and observation 
> is it works as expected NOW.
> So the sort of summary is there is a initialization issue with this api where 
> it does not advance time but after step 2 it stabilizes and works as expected.
> This is easily reproducible and would request apache beam to fix this.
> As of now the temp fix we have gone is with LogAppendTime which works 
> flawlessly but we do not want to process packets on broker time due to 
> various application needs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-12193) WatchKafkaTopicPartitionDoFn reports user counter to indicate which TopicPartition has been emitted to downstream

2021-04-19 Thread Boyuan Zhang (Jira)
Boyuan Zhang created BEAM-12193:
---

 Summary: WatchKafkaTopicPartitionDoFn reports user counter to 
indicate which TopicPartition has been emitted to downstream
 Key: BEAM-12193
 URL: https://issues.apache.org/jira/browse/BEAM-12193
 Project: Beam
  Issue Type: Improvement
  Components: io-java-kafka
Reporter: Boyuan Zhang
Assignee: Boyuan Zhang


WatchKafkaTopicPartitionDoFn can use user counter to indicate which 
TopicPartition has been outputted to downstream.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-12192) WatchKafkaTopicPartitionDoFn should respect given topic from KafkaIO

2021-04-19 Thread Boyuan Zhang (Jira)
Boyuan Zhang created BEAM-12192:
---

 Summary: WatchKafkaTopicPartitionDoFn should respect given topic 
from KafkaIO
 Key: BEAM-12192
 URL: https://issues.apache.org/jira/browse/BEAM-12192
 Project: Beam
  Issue Type: Improvement
  Components: io-java-kafka
Reporter: Boyuan Zhang
Assignee: Boyuan Zhang






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (BEAM-12162) Setup tpcds benchmark for dataflow runner v2

2021-04-13 Thread Boyuan Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-12162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17320468#comment-17320468
 ] 

Boyuan Zhang edited comment on BEAM-12162 at 4/13/21, 6:54 PM:
---

Based on the build file: 
https://github.com/apache/beam/blob/master/sdks/java/testing/tpcds/build.gradle#L105-L116,
 it only runs on java legacy worker.


was (Author: boyuanz):
Based on the build file, it only runs on java legacy worker.

> Setup tpcds benchmark for dataflow runner v2
> 
>
> Key: BEAM-12162
> URL: https://issues.apache.org/jira/browse/BEAM-12162
> Project: Beam
>  Issue Type: Task
>  Components: runner-dataflow, testing-tpcds
>Reporter: Boyuan Zhang
>Assignee: Rui Wang
>Priority: P2
>
> Currently the tpcds benchmark only runs on dataflow legacy worker. We also 
> want to track the performance on runner v2: 
> https://github.com/apache/beam/pull/14373/files#r612632569



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-12162) Setup tpcds benchmark for dataflow runner v2

2021-04-13 Thread Boyuan Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-12162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17320468#comment-17320468
 ] 

Boyuan Zhang commented on BEAM-12162:
-

Based on the build file, it only runs on java legacy worker.

> Setup tpcds benchmark for dataflow runner v2
> 
>
> Key: BEAM-12162
> URL: https://issues.apache.org/jira/browse/BEAM-12162
> Project: Beam
>  Issue Type: Task
>  Components: runner-dataflow, testing-tpcds
>Reporter: Boyuan Zhang
>Assignee: Rui Wang
>Priority: P2
>
> Currently the tpcds benchmark only runs on dataflow legacy worker. We also 
> want to track the performance on runner v2: 
> https://github.com/apache/beam/pull/14373/files#r612632569



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (BEAM-12162) Setup tpcds benchmark for dataflow runner v2

2021-04-13 Thread Boyuan Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-12162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyuan Zhang reassigned BEAM-12162:
---

Assignee: Rui Wang

> Setup tpcds benchmark for dataflow runner v2
> 
>
> Key: BEAM-12162
> URL: https://issues.apache.org/jira/browse/BEAM-12162
> Project: Beam
>  Issue Type: Task
>  Components: runner-dataflow, testing-tpcds
>Reporter: Boyuan Zhang
>Assignee: Rui Wang
>Priority: P2
>
> Currently the tpcds benchmark only runs on dataflow legacy worker. We also 
> want to track the performance on runner v2: 
> https://github.com/apache/beam/pull/14373/files#r612632569



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-12162) Setup tpcds benchmark for dataflow runner v2

2021-04-13 Thread Boyuan Zhang (Jira)
Boyuan Zhang created BEAM-12162:
---

 Summary: Setup tpcds benchmark for dataflow runner v2
 Key: BEAM-12162
 URL: https://issues.apache.org/jira/browse/BEAM-12162
 Project: Beam
  Issue Type: Task
  Components: testing-tpcds
Reporter: Boyuan Zhang


Currently the tpcds benchmark only runs on dataflow legacy worker. We also want 
to track the performance on runner v2: 
https://github.com/apache/beam/pull/14373/files#r612632569



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-12162) Setup tpcds benchmark for dataflow runner v2

2021-04-13 Thread Boyuan Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-12162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyuan Zhang updated BEAM-12162:

Component/s: runner-dataflow

> Setup tpcds benchmark for dataflow runner v2
> 
>
> Key: BEAM-12162
> URL: https://issues.apache.org/jira/browse/BEAM-12162
> Project: Beam
>  Issue Type: Task
>  Components: runner-dataflow, testing-tpcds
>Reporter: Boyuan Zhang
>Priority: P2
>
> Currently the tpcds benchmark only runs on dataflow legacy worker. We also 
> want to track the performance on runner v2: 
> https://github.com/apache/beam/pull/14373/files#r612632569



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-12160) Please fix errorprone, checkstyle and lint warnings for tpcds module

2021-04-12 Thread Boyuan Zhang (Jira)
Boyuan Zhang created BEAM-12160:
---

 Summary: Please fix errorprone, checkstyle and lint warnings for 
tpcds module
 Key: BEAM-12160
 URL: https://issues.apache.org/jira/browse/BEAM-12160
 Project: Beam
  Issue Type: Bug
  Components: testing
Reporter: Boyuan Zhang






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-11148) Kafka commitOffsetsInFinalize OOM on Flink

2021-04-09 Thread Boyuan Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-11148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17318179#comment-17318179
 ] 

Boyuan Zhang commented on BEAM-11148:
-

Workaround:
Using --experiments="beam_fn_api_use_deprecated_read" should help to resolve 
this issue.

Root cause:
>From 2.25.0, we switched Read implementation from 
>UnboundedSource/BoundedSource to Splittable DoFn wrapper implementation, where 
>we have a different mechanism to perform commit offset action: that is only 
>portable flink runner with checkpoint interval configured can do that. 

> Kafka commitOffsetsInFinalize OOM on Flink
> --
>
> Key: BEAM-11148
> URL: https://issues.apache.org/jira/browse/BEAM-11148
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-kafka, runner-flink
>Affects Versions: 2.25.0
> Environment: jdk1.8
> apache-zookeeper-3.4.14
> hadoop-3.2.1
> flink-1.11.1
>Reporter: fangyu
>Priority: P1
>
> Hi,
> I upgraded Beam from 2.19.0 (flink 1.9) to 2.25.0 (flink 1.11.1),And then it 
> doesn't work。 
>  
> The cluster version I use is:
>     jdk1.8
>     apache-zookeeper-3.4.14
>     hadoop-3.2.1
>     flink-1.11.1
>  
> Submit job use command:
> {code}
> bin/flink run -m yarn-cluster -ynm "xxx" -yjm 2048 -ytm 8192 
> ./some-executable.jar \
> --appName=xxxname \
> --runner=FlinkRunner \
> --parallelism=2 \
> --sourceKafkaUrl=192.168.12.13:9092 \
> --sourceTopic=sometopic \
> --sourceGroupId=guofy-host-dev \
> --sinkKafkaUrl=192.168.12.13:9092 \
> --debug=true \
> &
> {code}
>  
> Yarn is ok but taskmanager.log has exceptioins. 
> Kafka comsumer into an infinite loop, and finally report 
> java.lang.OutOfMemoryError: GC overhead limit is exceeded.
> Below is a partial log. Please help to analyze and solve it.
> {code} 
> 2020-10-27 21:54:19.685 INFO  org.apache.kafka.clients.Metadata  - [Consumer 
> clientId=consumer-guofy-host-dev-6, groupId=guofy-host-dev] Cluster ID: 
> EVoHjOG8SwG7x5F-8y2cYA
> 2020-10-27 21:54:19.685 INFO  org.apache.kafka.common.utils.AppInfoParser  - 
> Kafka version: 2.6.0
> 2020-10-27 21:54:19.685 INFO  org.apache.kafka.clients.Metadata  - [Consumer 
> clientId=consumer-guofy-host-dev-6, groupId=guofy-host-dev] Cluster ID: 
> EVoHjOG8SwG7x5F-8y2cYA
> 2020-10-27 21:54:19.685 INFO  org.apache.kafka.common.utils.AppInfoParser  - 
> Kafka version: 2.6.0
> 2020-10-27 21:54:19.685 INFO  org.apache.kafka.common.utils.AppInfoParser  - 
> Kafka commitId: 62abe01bee039651
> 2020-10-27 21:54:19.685 INFO  org.apache.kafka.common.utils.AppInfoParser  - 
> Kafka commitId: 62abe01bee039651
> 2020-10-27 21:54:19.685 INFO  org.apache.kafka.common.utils.AppInfoParser  - 
> Kafka startTimeMs: 1603806859685
> 2020-10-27 21:54:19.685 INFO  org.apache.kafka.common.utils.AppInfoParser  - 
> Kafka startTimeMs: 1603806859685
> 2020-10-27 21:54:19.686 INFO  org.apache.kafka.clients.consumer.KafkaConsumer 
>  - [Consumer 
> clientId=consumer-Reader-0_offset_consumer_528781572_guofy-host-dev-7, 
> groupId=Reader-0_offset_consumer_528781572_guofy-host-dev] Subscribed to 
> partition(s): guofangyu-vm-dev-0
> 2020-10-27 21:54:19.686 INFO  org.apache.kafka.clients.consumer.KafkaConsumer 
>  - [Consumer 
> clientId=consumer-Reader-0_offset_consumer_528781572_guofy-host-dev-7, 
> groupId=Reader-0_offset_consumer_528781572_guofy-host-dev] Subscribed to 
> partition(s): guofangyu-vm-dev-0
> 2020-10-27 21:54:19.686 INFO  
> o.apache.kafka.clients.consumer.internals.SubscriptionState  - [Consumer 
> clientId=consumer-Reader-0_offset_consumer_528781572_guofy-host-dev-7, 
> groupId=Reader-0_offset_consumer_528781572_guofy-host-dev] Seeking to LATEST 
> offset of partition g
> uofangyu-vm-dev-0
> 2020-10-27 21:54:19.686 INFO  
> o.apache.kafka.clients.consumer.internals.SubscriptionState  - [Consumer 
> clientId=consumer-Reader-0_offset_consumer_528781572_guofy-host-dev-7, 
> groupId=Reader-0_offset_consumer_528781572_guofy-host-dev] Seeking to LATEST 
> offset of partition g
> uofangyu-vm-dev-0
> 2020-10-27 21:54:19.688 INFO  org.apache.kafka.clients.Metadata  - [Consumer 
> clientId=consumer-Reader-0_offset_consumer_528781572_guofy-host-dev-7, 
> groupId=Reader-0_offset_consumer_528781572_guofy-host-dev] Cluster ID: 
> EVoHjOG8SwG7x5F-8y2cYA
> 2020-10-27 21:54:19.688 INFO  org.apache.kafka.clients.Metadata  - [Consumer 
> clientId=consumer-Reader-0_offset_consumer_528781572_guofy-host-dev-7, 
> groupId=Reader-0_offset_consumer_528781572_guofy-host-dev] Cluster ID: 
> EVoHjOG8SwG7x5F-8y2cYA
> 2020-10-27 21:54:19.690 INFO  
> o.apache.kafka.clients.consumer.internals.SubscriptionState  - [Consumer 
> clientId=consumer-Reader-0_offset_consumer_528781572_guofy-host-dev-7, 
> groupId=Reader-0_offset_consumer_528781572_guofy-host-dev] Resetting offset 
> for partition guofangy
> u-vm-dev-0 to off

[jira] [Created] (BEAM-12114) Eliminate beam_fn_api from KafkaIO expansion

2021-04-06 Thread Boyuan Zhang (Jira)
Boyuan Zhang created BEAM-12114:
---

 Summary: Eliminate beam_fn_api from KafkaIO expansion
 Key: BEAM-12114
 URL: https://issues.apache.org/jira/browse/BEAM-12114
 Project: Beam
  Issue Type: Improvement
  Components: io-java-kafka
Reporter: Boyuan Zhang
Assignee: Boyuan Zhang


We are going to use Splittable DoFn expansion by default for KafkaIO without 
looking into beam_fn_api flag. But KafkaIO provides overrides for runners that 
decides to not use such expansion. Pipieline author can also use 
beam_fn_api_use_deprecated_read to swich to the old expansion.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-10663) CrossLanguageKafkaIOTest broken on Flink Runner

2021-04-02 Thread Boyuan Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-10663?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyuan Zhang updated BEAM-10663:

Resolution: Fixed
Status: Resolved  (was: Open)

> CrossLanguageKafkaIOTest broken on Flink Runner
> ---
>
> Key: BEAM-10663
> URL: https://issues.apache.org/jira/browse/BEAM-10663
> Project: Beam
>  Issue Type: Bug
>  Components: cross-language, io-py-kafka, test-failures
>Affects Versions: 2.24.0
>Reporter: Piotr Szuberski
>Assignee: Brian Hulette
>Priority: P1
>  Time Spent: 12h 40m
>  Remaining Estimate: 0h
>
> Python postcommits fail on CrossLanguageKafkaIO python tests after #11749 
> (BEAM-9977) merge.
>  
> Fragment of stackstrace:
> ```
> {{Caused by: java.util.concurrent.ExecutionException: 
> java.lang.RuntimeException: Error received from SDK harness for instruction 
> 2: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Could 
> not find a way to create AutoValue class class 
> org.apache.beam.sdk.io.kafka.KafkaSourceDescriptor
>   at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>   at 
> org.apache.beam.sdk.fn.data.CompletableFutureInboundDataClient.awaitCompletion(CompletableFutureInboundDataClient.java:48)
>   at 
> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.awaitCompletion(BeamFnDataInboundObserver.java:91)
>   at 
> org.apache.beam.fn.harness.BeamFnDataReadRunner.blockTillReadFinishes(BeamFnDataReadRunner.java:342)
>   at 
> org.apache.beam.fn.harness.data.PTransformFunctionRegistry.lambda$register$0(PTransformFunctionRegistry.java:108)
>   at 
> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:302)
>   at 
> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)
>   at 
> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)}}
> ```



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-10663) CrossLanguageKafkaIOTest broken on Flink Runner

2021-04-02 Thread Boyuan Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-10663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17314000#comment-17314000
 ] 

Boyuan Zhang commented on BEAM-10663:
-

Sorry that I made a mistake on the KafkaIO expansion when I enabled sdf 
implementation as default. Let me have a quick fix on that.

> CrossLanguageKafkaIOTest broken on Flink Runner
> ---
>
> Key: BEAM-10663
> URL: https://issues.apache.org/jira/browse/BEAM-10663
> Project: Beam
>  Issue Type: Bug
>  Components: cross-language, io-py-kafka, test-failures
>Affects Versions: 2.24.0
>Reporter: Piotr Szuberski
>Assignee: Brian Hulette
>Priority: P1
>  Time Spent: 9h 50m
>  Remaining Estimate: 0h
>
> Python postcommits fail on CrossLanguageKafkaIO python tests after #11749 
> (BEAM-9977) merge.
>  
> Fragment of stackstrace:
> ```
> {{Caused by: java.util.concurrent.ExecutionException: 
> java.lang.RuntimeException: Error received from SDK harness for instruction 
> 2: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Could 
> not find a way to create AutoValue class class 
> org.apache.beam.sdk.io.kafka.KafkaSourceDescriptor
>   at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>   at 
> org.apache.beam.sdk.fn.data.CompletableFutureInboundDataClient.awaitCompletion(CompletableFutureInboundDataClient.java:48)
>   at 
> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.awaitCompletion(BeamFnDataInboundObserver.java:91)
>   at 
> org.apache.beam.fn.harness.BeamFnDataReadRunner.blockTillReadFinishes(BeamFnDataReadRunner.java:342)
>   at 
> org.apache.beam.fn.harness.data.PTransformFunctionRegistry.lambda$register$0(PTransformFunctionRegistry.java:108)
>   at 
> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:302)
>   at 
> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)
>   at 
> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)}}
> ```



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-8218) Implement Apache PulsarIO

2021-03-30 Thread Boyuan Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-8218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17311852#comment-17311852
 ] 

Boyuan Zhang commented on BEAM-8218:


Hi [~mxm], 

Do you happen to make some progress on this? Would you like to share your draft 
as you mentioned before?

> Implement Apache PulsarIO
> -
>
> Key: BEAM-8218
> URL: https://issues.apache.org/jira/browse/BEAM-8218
> Project: Beam
>  Issue Type: Task
>  Components: io-ideas
>Reporter: Alex Van Boxel
>Priority: P3
>
> Apache Pulsar is starting to gain popularity. Having a native Beam PulsarIO 
> could be beneficial.
> [https://pulsar.apache.org/|https://pulsar.apache.org/en/]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-11996) Implement SpannerIO on top of Splittable DoFn

2021-03-30 Thread Boyuan Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-11996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17311843#comment-17311843
 ] 

Boyuan Zhang commented on BEAM-11996:
-

h3. Goal
Convert 
[BatchSpannerRead|https://github.com/apache/beam/blob/a16bbf78bb5b3d3a14d13fb39ed442c612d0b493/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/BatchSpannerRead.java#L45]
 into Splittable DoFn. 
Investigate whether it makes sense to convert 
[NaiveSpannerReadFn|https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerRead.java#L73]
 into Splittable DoFn as well. 

h3. Details
The Splittable DoFn implementation should be a combination of 
[GeneratePartitionsFn + 
ReadFromPartitionFn|https://github.com/apache/beam/blob/a16bbf78bb5b3d3a14d13fb39ed442c612d0b493/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/BatchSpannerRead.java#L73-L79]

> Implement SpannerIO on top of Splittable DoFn
> -
>
> Key: BEAM-11996
> URL: https://issues.apache.org/jira/browse/BEAM-11996
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-gcp
>Reporter: Boyuan Zhang
>Priority: P2
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-11997) Implement RedisIO on top of Splittable DoFn

2021-03-30 Thread Boyuan Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-11997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17311809#comment-17311809
 ] 

Boyuan Zhang commented on BEAM-11997:
-

h3. Goal
Convert 
[ReadFn|https://github.com/apache/beam/blob/a16bbf78bb5b3d3a14d13fb39ed442c612d0b493/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L468]
 from simple DoFn to SDF

h3. Examples
[ParquetIO.SplitReadFn|https://github.com/apache/beam/blob/a16bbf78bb5b3d3a14d13fb39ed442c612d0b493/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java#L794]

> Implement RedisIO on top of Splittable DoFn
> ---
>
> Key: BEAM-11997
> URL: https://issues.apache.org/jira/browse/BEAM-11997
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-redis
>Reporter: Boyuan Zhang
>Priority: P2
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-11995) Implement FileIO/TextIO on top of Splittable DoFn

2021-03-30 Thread Boyuan Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-11995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17311797#comment-17311797
 ] 

Boyuan Zhang commented on BEAM-11995:
-

h3.  Goal
Current 
[FileIO|https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java]
 and 
[TextIO|https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java]
 reads 
[FiledBasedSource|https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java]
 via 
[ReadAllViaFileBasedSource|https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java].
 We want to turn ReadAllViaFileBasedSource into SDF implementation to gain 
benefits of dynamic split.

h3. Details
[ReadAllViaFileBasedSource|https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java]
 is a composite transform which expands into "Split into ranges" -> "Reshuffle" 
-> "Read ranges": 
[code|https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java#L63-L67].
 The "Read ranges" still uses 
[BoundedSource|https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedSource.java]
 and 
[BoundedReader|https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedSource.java#L108]
 APIs to read from file. When converting ReadAllViaFileBasedSource transform 
into SDF implementation, we can still use BoundedSource and BoundedReader APIs 
to read context since these have got hooked up with multiple kind of files: 
[query|https://github.com/apache/beam/search?q=ReadAllViaFileBasedSource]. 

Another option is to build SDF read for every FileBasedSource: 
[AvroSource|https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java],
 
[CompressedSource|https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java],
 
[TextSource|https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java]
 , 
[TFRecordSource|https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java#L488].
 The SDF should take a ReableFile as input and emit file context. Then we can 
replace all ReadAllViaFileBasedSource with this SDF implementation.

h3. Code examples
* Build SDF based on BoundedSource and BoundedReader API: 
[BoundedSourceAsSDFWrapperFn|https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L261]
* Build SDF to replace BoundedSource/BoundedReader: 
[ParquetIO|https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java#L716]

> Implement FileIO/TextIO on top of Splittable DoFn
> -
>
> Key: BEAM-11995
> URL: https://issues.apache.org/jira/browse/BEAM-11995
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-files
>Reporter: Boyuan Zhang
>Priority: P2
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-11995) Implement FileIO/TextIO on top of Splittable DoFn

2021-03-30 Thread Boyuan Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-11995?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyuan Zhang updated BEAM-11995:

Summary: Implement FileIO/TextIO on top of Splittable DoFn  (was: Implement 
FileIO on top of Splittable DoFn)

> Implement FileIO/TextIO on top of Splittable DoFn
> -
>
> Key: BEAM-11995
> URL: https://issues.apache.org/jira/browse/BEAM-11995
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-files
>Reporter: Boyuan Zhang
>Priority: P2
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-11998) Portable runners should be able to issue checkpoints to Splittable DoFn

2021-03-30 Thread Boyuan Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-11998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17311703#comment-17311703
 ] 

Boyuan Zhang commented on BEAM-11998:
-

The most easy way is to issue ProcessBundleSplitRequest(fraction_of_remainder = 
0) from runner in a output-bounded or time-bounded manner:
* runner should have a mechanism to issue ProcessBundleSplitRequest regularly. 
If it's in a time-bounded and output-bounded manner, the logic could be similar 
to 
https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
 and the implementation can be shared among different java runners.
* A custom BundleSplitHandler should be created to handle split response. If 
the way of rescheduling residuals is using timer and state, the logic could be 
similar to StateAndTimerBundleCheckpointHandler: 
https://github.com/apache/beam/blob/a16bbf78bb5b3d3a14d13fb39ed442c612d0b493/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/BundleCheckpointHandlers.java#L53
 and the implementation can be shared among different java runners.
* Each runners needs to be hooked up with these 2 components mentioned above, 
including: hold watermark correctly, persist and reschedule residuals as 
expected.

> Portable runners should be able to issue checkpoints to Splittable DoFn
> ---
>
> Key: BEAM-11998
> URL: https://issues.apache.org/jira/browse/BEAM-11998
> Project: Beam
>  Issue Type: New Feature
>  Components: cross-language, runner-flink, runner-spark
>Reporter: Boyuan Zhang
>Priority: P2
>
> To execute unbounded Splittable DoFn over fnapi in streaming mode properly, 
> portable runners should issue split(ProcessBundleSplitRequest with 
> fraction_of_remainder > 0) or simply checkpoint(ProcessBundleSplitRequest 
> with fraction_of_remainder == 0) to SDK regularly to make current bundle 
> finished processing instead of running forever.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-10995) Java + Universal Local Runner: WindowingTest.testWindowPreservation fails

2021-03-29 Thread Boyuan Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-10995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17310895#comment-17310895
 ] 

Boyuan Zhang commented on BEAM-10995:
-

No: 
https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/runners/portability/java/build.gradle#L196

> Java + Universal Local Runner: WindowingTest.testWindowPreservation fails
> -
>
> Key: BEAM-10995
> URL: https://issues.apache.org/jira/browse/BEAM-10995
> Project: Beam
>  Issue Type: Bug
>  Components: runner-universal, sdk-java-harness
>Reporter: Kenneth Knowles
>Priority: P3
>  Labels: Clarified, flake
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-12008) KafkaIO does not handle null keys

2021-03-29 Thread Boyuan Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-12008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17310850#comment-17310850
 ] 

Boyuan Zhang commented on BEAM-12008:
-

The issue here is that KafkaRecordCoder doesn't handle null key: the key/value 
is represented as KvCoder. Especially if the key is byte[], the ByteArrayCoder 
doesn't allow the input is new bytes[0].

> KafkaIO does not handle null keys
> -
>
> Key: BEAM-12008
> URL: https://issues.apache.org/jira/browse/BEAM-12008
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-kafka
>Reporter: Daniel Collins
>Priority: P2
>
> Kafka 
> [ConsumerRecord|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html#key--]
>  and 
> [ProducerRecord|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html#key--]
>  'key' fields are explicitly allowed to be null. In addition, on the producer 
> side, setting a null key is the way that the user indicates that they want a 
> [random partition for their 
> message|[https://github.com/apache/kafka/blob/9adfac280392da0837cfd8d582bc540951e94087/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java#L67].]
>  
> Beam KafkaIO does not support null keys in byte[] mode (read side: 
> [https://github.com/apache/beam/blob/9e0997760cf3320f1a1d0c4342d3dff559a25775/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L727|https://github.com/apache/beam/blob/9e0997760cf3320f1a1d0c4342d3dff559a25775/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L727)]
> write side: 
> [https://github.com/apache/beam/blob/9e0997760cf3320f1a1d0c4342d3dff559a25775/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java#L58])
>  
> since it would defer to ByteArrayCoder which does not support null arrays.
>  
> BeamKafkaTable suffers the same issue 
> https://github.com/apache/beam/blob/9e0997760cf3320f1a1d0c4342d3dff559a25775/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java#L144



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (BEAM-12021) PubsubReadIT failures: "Cannot nackAll on persisting checkpoint"

2021-03-19 Thread Boyuan Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-12021?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyuan Zhang reassigned BEAM-12021:
---

Assignee: Kenneth Knowles  (was: Boyuan Zhang)

> PubsubReadIT failures: "Cannot nackAll on persisting checkpoint"
> 
>
> Key: BEAM-12021
> URL: https://issues.apache.org/jira/browse/BEAM-12021
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-gcp, test-failures
>Reporter: Tyson Hamilton
>Assignee: Kenneth Knowles
>Priority: P1
>  Labels: currently-failing
>
> * 
> [org.apache.beam.sdk.io.gcp.pubsub.PubsubReadIT.testReadPubsubMessageId|https://ci-beam.apache.org/job/beam_PostCommit_Java/7332/testReport/junit/org.apache.beam.sdk.io.gcp.pubsub/PubsubReadIT/testReadPubsubMessageId_2/]
>  * 
> [org.apache.beam.sdk.io.gcp.pubsub.PubsubReadIT.testReadPublicData|https://ci-beam.apache.org/job/beam_PostCommit_Java/7332/testReport/junit/org.apache.beam.sdk.io.gcp.pubsub/PubsubReadIT/testReadPublicData_2/]
> Job:
> [https://console.cloud.google.com/dataflow/jobs/us-central1/2021-03-17_14_02_29-12611938587996322031?project=apache-beam-testing]
>  
> Many worker errors (754) that look like:
> {code:java}
> 2021-03-17 20:47:29.000 PDTError message from worker: generic::unknown: 
> java.lang.IllegalStateException: Cannot nackAll on persisting checkpoint 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:507)
>  
> org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource$PubsubCheckpoint.nackAll(PubsubUnboundedSource.java:308)
>  
> org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource$PubsubSource.createReader(PubsubUnboundedSource.java:1071)
>  
> org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource$PubsubSource.createReader(PubsubUnboundedSource.java:1012)
>  
> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceAsSDFRestrictionTracker.getProgress(Read.java:963)
>  
> org.apache.beam.sdk.transforms.reflect.ByteBuddyDoFnInvokerFactory$DefaultGetSize.invokeGetSize(ByteBuddyDoFnInvokerFactory.java:426)
>  
> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown
>  Source) 
> org.apache.beam.fn.harness.FnApiDoFnRunner.calculateRestrictionSize(FnApiDoFnRunner.java:1182)
>  
> org.apache.beam.fn.harness.FnApiDoFnRunner.trySplitForElementAndRestriction(FnApiDoFnRunner.java:1608)
>  
> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForWindowObservingSizedElementAndRestriction(FnApiDoFnRunner.java:1059)
>  
> org.apache.beam.fn.harness.FnApiDoFnRunner.access$1000(FnApiDoFnRunner.java:139)
>  
> org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:638) 
> org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:633) 
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:246)
>  
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:200)
>  
> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:220)
>  
> org.apache.beam.sdk.fn.data.DecodingFnDataReceiver.accept(DecodingFnDataReceiver.java:43)
>  
> org.apache.beam.sdk.fn.data.DecodingFnDataReceiver.accept(DecodingFnDataReceiver.java:25)
>  
> org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:111)
>  
> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:308)
>  
> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)
>  
> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)
>  
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  java.lang.Thread.run(Thread.java:748) passed through: ==> 
> dist_proc/dax/workflow/worker/fnapi_service.cc:631
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-12021) PubsubReadIT failures: "Cannot nackAll on persisting checkpoint"

2021-03-19 Thread Boyuan Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-12021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17305259#comment-17305259
 ] 

Boyuan Zhang commented on BEAM-12021:
-

https://github.com/apache/beam/pull/14276 should fix this issue: 
beam.apache.org/job/beam_PostCommit_Java_PR/623/testReport/junit/org.apache.beam.sdk.io.gcp.pubsub/PubsubReadIT/

> PubsubReadIT failures: "Cannot nackAll on persisting checkpoint"
> 
>
> Key: BEAM-12021
> URL: https://issues.apache.org/jira/browse/BEAM-12021
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-gcp, test-failures
>Reporter: Tyson Hamilton
>Assignee: Boyuan Zhang
>Priority: P1
>  Labels: currently-failing
>
> * 
> [org.apache.beam.sdk.io.gcp.pubsub.PubsubReadIT.testReadPubsubMessageId|https://ci-beam.apache.org/job/beam_PostCommit_Java/7332/testReport/junit/org.apache.beam.sdk.io.gcp.pubsub/PubsubReadIT/testReadPubsubMessageId_2/]
>  * 
> [org.apache.beam.sdk.io.gcp.pubsub.PubsubReadIT.testReadPublicData|https://ci-beam.apache.org/job/beam_PostCommit_Java/7332/testReport/junit/org.apache.beam.sdk.io.gcp.pubsub/PubsubReadIT/testReadPublicData_2/]
> Job:
> [https://console.cloud.google.com/dataflow/jobs/us-central1/2021-03-17_14_02_29-12611938587996322031?project=apache-beam-testing]
>  
> Many worker errors (754) that look like:
> {code:java}
> 2021-03-17 20:47:29.000 PDTError message from worker: generic::unknown: 
> java.lang.IllegalStateException: Cannot nackAll on persisting checkpoint 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:507)
>  
> org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource$PubsubCheckpoint.nackAll(PubsubUnboundedSource.java:308)
>  
> org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource$PubsubSource.createReader(PubsubUnboundedSource.java:1071)
>  
> org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource$PubsubSource.createReader(PubsubUnboundedSource.java:1012)
>  
> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceAsSDFRestrictionTracker.getProgress(Read.java:963)
>  
> org.apache.beam.sdk.transforms.reflect.ByteBuddyDoFnInvokerFactory$DefaultGetSize.invokeGetSize(ByteBuddyDoFnInvokerFactory.java:426)
>  
> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown
>  Source) 
> org.apache.beam.fn.harness.FnApiDoFnRunner.calculateRestrictionSize(FnApiDoFnRunner.java:1182)
>  
> org.apache.beam.fn.harness.FnApiDoFnRunner.trySplitForElementAndRestriction(FnApiDoFnRunner.java:1608)
>  
> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForWindowObservingSizedElementAndRestriction(FnApiDoFnRunner.java:1059)
>  
> org.apache.beam.fn.harness.FnApiDoFnRunner.access$1000(FnApiDoFnRunner.java:139)
>  
> org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:638) 
> org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:633) 
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:246)
>  
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:200)
>  
> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:220)
>  
> org.apache.beam.sdk.fn.data.DecodingFnDataReceiver.accept(DecodingFnDataReceiver.java:43)
>  
> org.apache.beam.sdk.fn.data.DecodingFnDataReceiver.accept(DecodingFnDataReceiver.java:25)
>  
> org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:111)
>  
> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:308)
>  
> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)
>  
> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)
>  
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  java.lang.Thread.run(Thread.java:748) passed through: ==> 
> dist_proc/dax/workflow/worker/fnapi_service.cc:631
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (BEAM-10663) CrossLanguageKafkaIOTest broken on Flink Runner

2021-03-17 Thread Boyuan Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-10663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17303613#comment-17303613
 ] 

Boyuan Zhang edited comment on BEAM-10663 at 3/17/21, 5:37 PM:
---

I don't think this test will be broken owing to 
https://issues.apache.org/jira/browse/BEAM-11998 because it specifies 
max_num_records, which in java the read goes through a composite transform 
BoundedReadFromUnboundedSource. It will not go through SDF code path.


was (Author: boyuanz):
I don't think this test will be broken owing to 
https://issues.apache.org/jira/browse/BEAM-11998 because it specifies 
max_num_records, which in java the read goes through 
BoundedReadFromUnboundedSource, which is a composite transform. It will not go 
through SDF code path.

> CrossLanguageKafkaIOTest broken on Flink Runner
> ---
>
> Key: BEAM-10663
> URL: https://issues.apache.org/jira/browse/BEAM-10663
> Project: Beam
>  Issue Type: Bug
>  Components: cross-language, io-py-kafka, test-failures
>Affects Versions: 2.24.0
>Reporter: Piotr Szuberski
>Assignee: Brian Hulette
>Priority: P1
>  Time Spent: 6h 40m
>  Remaining Estimate: 0h
>
> Python postcommits fail on CrossLanguageKafkaIO python tests after #11749 
> (BEAM-9977) merge.
>  
> Fragment of stackstrace:
> ```
> {{Caused by: java.util.concurrent.ExecutionException: 
> java.lang.RuntimeException: Error received from SDK harness for instruction 
> 2: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Could 
> not find a way to create AutoValue class class 
> org.apache.beam.sdk.io.kafka.KafkaSourceDescriptor
>   at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>   at 
> org.apache.beam.sdk.fn.data.CompletableFutureInboundDataClient.awaitCompletion(CompletableFutureInboundDataClient.java:48)
>   at 
> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.awaitCompletion(BeamFnDataInboundObserver.java:91)
>   at 
> org.apache.beam.fn.harness.BeamFnDataReadRunner.blockTillReadFinishes(BeamFnDataReadRunner.java:342)
>   at 
> org.apache.beam.fn.harness.data.PTransformFunctionRegistry.lambda$register$0(PTransformFunctionRegistry.java:108)
>   at 
> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:302)
>   at 
> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)
>   at 
> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)}}
> ```



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-10663) CrossLanguageKafkaIOTest broken on Flink Runner

2021-03-17 Thread Boyuan Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-10663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17303613#comment-17303613
 ] 

Boyuan Zhang commented on BEAM-10663:
-

I don't think this test will be broken owing to 
https://issues.apache.org/jira/browse/BEAM-11998 because it specifies 
max_num_records, which in java the read goes through 
BoundedReadFromUnboundedSource, which is a composite transform. It will not go 
through SDF code path.

> CrossLanguageKafkaIOTest broken on Flink Runner
> ---
>
> Key: BEAM-10663
> URL: https://issues.apache.org/jira/browse/BEAM-10663
> Project: Beam
>  Issue Type: Bug
>  Components: cross-language, io-py-kafka, test-failures
>Affects Versions: 2.24.0
>Reporter: Piotr Szuberski
>Assignee: Brian Hulette
>Priority: P1
>  Time Spent: 6h 40m
>  Remaining Estimate: 0h
>
> Python postcommits fail on CrossLanguageKafkaIO python tests after #11749 
> (BEAM-9977) merge.
>  
> Fragment of stackstrace:
> ```
> {{Caused by: java.util.concurrent.ExecutionException: 
> java.lang.RuntimeException: Error received from SDK harness for instruction 
> 2: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Could 
> not find a way to create AutoValue class class 
> org.apache.beam.sdk.io.kafka.KafkaSourceDescriptor
>   at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>   at 
> org.apache.beam.sdk.fn.data.CompletableFutureInboundDataClient.awaitCompletion(CompletableFutureInboundDataClient.java:48)
>   at 
> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.awaitCompletion(BeamFnDataInboundObserver.java:91)
>   at 
> org.apache.beam.fn.harness.BeamFnDataReadRunner.blockTillReadFinishes(BeamFnDataReadRunner.java:342)
>   at 
> org.apache.beam.fn.harness.data.PTransformFunctionRegistry.lambda$register$0(PTransformFunctionRegistry.java:108)
>   at 
> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:302)
>   at 
> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)
>   at 
> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)}}
> ```



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-11998) Portable runners should be able to issue checkpoints to Splittable DoFn

2021-03-16 Thread Boyuan Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-11998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17302976#comment-17302976
 ] 

Boyuan Zhang commented on BEAM-11998:
-

Code pointer for how we executes unbounded Splittable DoFn in non-portable 
execution: 
https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java

> Portable runners should be able to issue checkpoints to Splittable DoFn
> ---
>
> Key: BEAM-11998
> URL: https://issues.apache.org/jira/browse/BEAM-11998
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink, runner-spark
>Reporter: Boyuan Zhang
>Priority: P2
>
> To execute unbounded Splittable DoFn over fnapi in streaming mode properly, 
> portable runners should issue split(ProcessBundleSplitRequest with 
> fraction_of_remainder > 0) or simply checkpoint(ProcessBundleSplitRequest 
> with fraction_of_remainder == 0) to SDK regularly to make current bundle 
> finished processing instead of running forever.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-11998) Portable runners should be able to issue checkpoints to Splittable DoFn

2021-03-16 Thread Boyuan Zhang (Jira)
Boyuan Zhang created BEAM-11998:
---

 Summary: Portable runners should be able to issue checkpoints to 
Splittable DoFn
 Key: BEAM-11998
 URL: https://issues.apache.org/jira/browse/BEAM-11998
 Project: Beam
  Issue Type: New Feature
  Components: runner-flink, runner-spark
Reporter: Boyuan Zhang


To execute unbounded Splittable DoFn over fnapi in streaming mode properly, 
portable runners should issue split(ProcessBundleSplitRequest with 
fraction_of_remainder > 0) or simply checkpoint(ProcessBundleSplitRequest with 
fraction_of_remainder == 0) to SDK regularly to make current bundle finished 
processing instead of running forever.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-11749) Portable Flink runner skips timers when dynamic timer tags are used

2021-03-16 Thread Boyuan Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-11749?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17302950#comment-17302950
 ] 

Boyuan Zhang commented on BEAM-11749:
-

Hi, we fixed the dynamic timers support in 
https://issues.apache.org/jira/browse/BEAM-10120. Please check it out to see 
whether it helps.

> Portable Flink runner skips timers when dynamic timer tags are used
> ---
>
> Key: BEAM-11749
> URL: https://issues.apache.org/jira/browse/BEAM-11749
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.27.0
>Reporter: Marek Pikulski
>Priority: P2
> Attachments: timer_test.py
>
>
> Timers in Flink runner do not fire as expected. See attached example code for 
> details.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-11997) Implement RedisIO on top of Splittable DoFn

2021-03-16 Thread Boyuan Zhang (Jira)
Boyuan Zhang created BEAM-11997:
---

 Summary: Implement RedisIO on top of Splittable DoFn
 Key: BEAM-11997
 URL: https://issues.apache.org/jira/browse/BEAM-11997
 Project: Beam
  Issue Type: Improvement
  Components: io-java-redis
Reporter: Boyuan Zhang






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-11996) Implement SpannerIO on top of Splittable DoFn

2021-03-16 Thread Boyuan Zhang (Jira)
Boyuan Zhang created BEAM-11996:
---

 Summary: Implement SpannerIO on top of Splittable DoFn
 Key: BEAM-11996
 URL: https://issues.apache.org/jira/browse/BEAM-11996
 Project: Beam
  Issue Type: Improvement
  Components: io-java-gcp
Reporter: Boyuan Zhang






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-11995) Implement FileIO on top of Splittable DoFn

2021-03-16 Thread Boyuan Zhang (Jira)
Boyuan Zhang created BEAM-11995:
---

 Summary: Implement FileIO on top of Splittable DoFn
 Key: BEAM-11995
 URL: https://issues.apache.org/jira/browse/BEAM-11995
 Project: Beam
  Issue Type: Improvement
  Components: io-java-files
Reporter: Boyuan Zhang






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-11991) Python Kafka source not emitting messages for streaming pipelines with Flink Runner

2021-03-16 Thread Boyuan Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-11991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17302752#comment-17302752
 ] 

Boyuan Zhang commented on BEAM-11991:
-

Just tried to refresh my memory.  I don't think portable Flink runner can 
execute unbounded SDF in streaming properly because we only have support for 
self-checkpoint. Portable streaming Flink doesn't issue checkpoint request to 
SDK regularly. That's why all kafka reocrds are buffered at current stage.

> Python Kafka source not emitting messages for streaming pipelines with Flink 
> Runner
> ---
>
> Key: BEAM-11991
> URL: https://issues.apache.org/jira/browse/BEAM-11991
> Project: Beam
>  Issue Type: Bug
>  Components: cross-language, runner-flink
>Reporter: Chamikara Madhusanka Jayalath
>Assignee: Boyuan Zhang
>Priority: P1
>
> To reproduce:
>  * Startup a local Kafka cluster: [https://kafka.apache.org/quickstart]
>  * Setup topics:
> bin/kafka-console-consumer.sh --topic mytopic1 --from-beginning 
> --bootstrap-server localhost:9092
> bin/kafka-console-consumer.sh --topic mytopic2 --from-beginning 
> --bootstrap-server localhost:9092
>  * Setup a Beam virtualenv and run a pipeline that reads from Kafka. For 
> example: [https://wtools.io/paste-code/b4je]
> > python ./pipeline.py --bootstrap_servers=localhost:9092 --in_topic=mytopic1 
> > --out_topic=mytopic2 --runner=FlinkRunner --streaming
>  * Publish messages as kv pairs.
>  ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic 
> mytopic1 --property "parse.key=true" --property "key.separator=:"
>  >a:b
>  >c:d
>  >e:f
>  * Messages do not get pushed to subsequent steps.
>  * Following seems to be working fine.
>      * X-lang Bounded read with Flink
>      * X-lang Kafka sink and with Flink
>   [~boyuanz] could you take a look to rule out any SDF/unbounded read related 
> issues ?
> cc: [~mxm] and [~angoenka] for Flink issues.
>  
> Beam user thread: 
> [https://lists.apache.org/x/thread.html/r9c74a8a7efa4b14f2f1d5f77ce8f12128cf1071861c1627f00702415@%3Cuser.beam.apache.org%3E]
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-11991) Python Kafka source not emitting messages for streaming pipelines with Flink Runner

2021-03-16 Thread Boyuan Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-11991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17302717#comment-17302717
 ] 

Boyuan Zhang commented on BEAM-11991:
-

I'm suspecting that the problem pipeline is not launched with streaming=True 
pipeline option, which causes the SDF is not executed with checkpoint. I have 
asked the dev who reported this issue to verify. 

> Python Kafka source not emitting messages for streaming pipelines with Flink 
> Runner
> ---
>
> Key: BEAM-11991
> URL: https://issues.apache.org/jira/browse/BEAM-11991
> Project: Beam
>  Issue Type: Bug
>  Components: cross-language, runner-flink
>Reporter: Chamikara Madhusanka Jayalath
>Assignee: Boyuan Zhang
>Priority: P1
>
> To reproduce:
>  * Startup a local Kafka cluster: [https://kafka.apache.org/quickstart]
>  * Setup topics:
> bin/kafka-console-consumer.sh --topic mytopic1 --from-beginning 
> --bootstrap-server localhost:9092
> bin/kafka-console-consumer.sh --topic mytopic2 --from-beginning 
> --bootstrap-server localhost:9092
>  * Setup a Beam virtualenv and run a pipeline that reads from Kafka. For 
> example: [https://wtools.io/paste-code/b4je]
> > python ./pipeline.py --bootstrap_servers=localhost:9092 --in_topic=mytopic1 
> > --out_topic=mytopic2 --runner=FlinkRunner --streaming
>  * Publish messages as kv pairs.
>  ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic 
> mytopic1 --property "parse.key=true" --property "key.separator=:"
>  >a:b
>  >c:d
>  >e:f
>  * Messages do not get pushed to subsequent steps.
>  * Following seems to be working fine.
>      * X-lang Bounded read with Flink
>      * X-lang Kafka sink and with Flink
>   [~boyuanz] could you take a look to rule out any SDF/unbounded read related 
> issues ?
> cc: [~mxm] and [~angoenka] for Flink issues.
>  
> Beam user thread: 
> [https://lists.apache.org/x/thread.html/r9c74a8a7efa4b14f2f1d5f77ce8f12128cf1071861c1627f00702415@%3Cuser.beam.apache.org%3E]
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-10663) CrossLanguageKafkaIOTest broken on Flink Runner

2021-03-16 Thread Boyuan Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-10663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17302671#comment-17302671
 ] 

Boyuan Zhang commented on BEAM-10663:
-

I think so. 

> CrossLanguageKafkaIOTest broken on Flink Runner
> ---
>
> Key: BEAM-10663
> URL: https://issues.apache.org/jira/browse/BEAM-10663
> Project: Beam
>  Issue Type: Bug
>  Components: cross-language, io-py-kafka, test-failures
>Affects Versions: 2.24.0
>Reporter: Piotr Szuberski
>Priority: P1
>  Time Spent: 3h 50m
>  Remaining Estimate: 0h
>
> Python postcommits fail on CrossLanguageKafkaIO python tests after #11749 
> (BEAM-9977) merge.
>  
> Fragment of stackstrace:
> ```
> {{Caused by: java.util.concurrent.ExecutionException: 
> java.lang.RuntimeException: Error received from SDK harness for instruction 
> 2: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Could 
> not find a way to create AutoValue class class 
> org.apache.beam.sdk.io.kafka.KafkaSourceDescriptor
>   at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>   at 
> org.apache.beam.sdk.fn.data.CompletableFutureInboundDataClient.awaitCompletion(CompletableFutureInboundDataClient.java:48)
>   at 
> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.awaitCompletion(BeamFnDataInboundObserver.java:91)
>   at 
> org.apache.beam.fn.harness.BeamFnDataReadRunner.blockTillReadFinishes(BeamFnDataReadRunner.java:342)
>   at 
> org.apache.beam.fn.harness.data.PTransformFunctionRegistry.lambda$register$0(PTransformFunctionRegistry.java:108)
>   at 
> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:302)
>   at 
> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)
>   at 
> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)}}
> ```



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-6868) Flink runner supports Bundle Finalization

2021-03-15 Thread Boyuan Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-6868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17302161#comment-17302161
 ] 

Boyuan Zhang commented on BEAM-6868:


I think the remaining issue here is to support bundle finalization in batch for 
portable flink. But it's not too bad not supporting bundle finalization in 
batch because most uses cases are in streaming. Different from streaming where 
we can take advantages of flink checkpoint, in batch it might be difficult to 
figure out a correct mechanism to perform bundle finalization in batch.

> Flink runner supports Bundle Finalization
> -
>
> Key: BEAM-6868
> URL: https://issues.apache.org/jira/browse/BEAM-6868
> Project: Beam
>  Issue Type: New Feature
>  Components: cross-language, runner-flink
>Affects Versions: 2.24.0
>Reporter: Boyuan Zhang
>Priority: P2
>  Time Spent: 2h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-11952) WindowMergingFnRunner unnecessarily keeps previous merged results

2021-03-10 Thread Boyuan Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-11952?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17299168#comment-17299168
 ] 

Boyuan Zhang commented on BEAM-11952:
-

Could you please add more details on which this incorrect behavior could lead 
to?

> WindowMergingFnRunner unnecessarily keeps previous merged results
> -
>
> Key: BEAM-11952
> URL: https://issues.apache.org/jira/browse/BEAM-11952
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-harness
>Reporter: Yichi Zhang
>Priority: P2
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-11947) KafkaUnboundedReader doesn't support updating pipeline with re-partition case.

2021-03-09 Thread Boyuan Zhang (Jira)
Boyuan Zhang created BEAM-11947:
---

 Summary: KafkaUnboundedReader doesn't support updating pipeline 
with re-partition case.
 Key: BEAM-11947
 URL: https://issues.apache.org/jira/browse/BEAM-11947
 Project: Beam
  Issue Type: Improvement
  Components: io-java-kafka
Reporter: Boyuan Zhang


When updating pipelines with re-partition topic, KafkaUnboundedReader will 
throw exceptions from 
https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L470-L474



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-11946) Use ReadFromKafkaDoFn for KafkaIO.Read by default when beam_fn_api is enabled

2021-03-09 Thread Boyuan Zhang (Jira)
Boyuan Zhang created BEAM-11946:
---

 Summary: Use ReadFromKafkaDoFn for KafkaIO.Read by default when 
beam_fn_api is enabled
 Key: BEAM-11946
 URL: https://issues.apache.org/jira/browse/BEAM-11946
 Project: Beam
  Issue Type: Improvement
  Components: io-java-kafka
Reporter: Boyuan Zhang


There was a data loss bug in Dataflow when executing x-lang pipeline, which 
prevents us using ReadFromKafkaDoFn for KafkaIO default. Now the bug is fixed 
on dataflow side and we should use ReadFromKafkaDoFn when with beam_fn_api 
instead of using sdf wrapper.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-11403) Unbounded SDF wrapper causes performance regression on DirectRunner

2021-03-09 Thread Boyuan Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-11403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17298290#comment-17298290
 ] 

Boyuan Zhang commented on BEAM-11403:
-

Sorry for the late. 
One last improvement I can come up with is to make checkpoint frequency can be 
configured by pipeline author: 
https://docs.google.com/document/d/18jNLtTyyApx0N2ytp1ytOMmUPLouj2h08N3-4SyWGgQ/edit?usp=sharing.
 

>From what Kaymak reported: 
>https://lists.apache.org/thread.html/r3972e2b6016f59af4bea7b3729f5467d0201007b6f217512914c2fb6%40%3Cuser.beam.apache.org%3E,
> it seems like watermark is still held back with 
>https://github.com/apache/beam/pull/14013.

> Unbounded SDF wrapper causes performance regression on DirectRunner
> ---
>
> Key: BEAM-11403
> URL: https://issues.apache.org/jira/browse/BEAM-11403
> Project: Beam
>  Issue Type: Bug
>  Components: runner-direct, sdk-java-core
>Reporter: Boyuan Zhang
>Priority: P2
>  Time Spent: 4.5h
>  Remaining Estimate: 0h
>
> There is a significant performance regression when switching from 
> UnboundedSource to Unbounded SDF wrapper. So far there are 2 IOs reported:
> * Pubsub Read: 
> https://lists.apache.org/thread.html/re6b0941a8b4951293a0327ce9b25e607cafd6e45b69783f65290edee%40%3Cdev.beam.apache.org%3E
> * Kafka Read: https://the-asf.slack.com/archives/C9H0YNP3P/p1606155042346600



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-11325) KafkaIO should be able to read from new added topic/partition automatically during pipeline execution time

2021-03-09 Thread Boyuan Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-11325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17298285#comment-17298285
 ] 

Boyuan Zhang commented on BEAM-11325:
-

Missing part is E2E test. I should be able to add it by the end of March. Or we 
can also create a new issue to track this. What do you think?

> KafkaIO should be able to read from new added topic/partition automatically 
> during pipeline execution time
> --
>
> Key: BEAM-11325
> URL: https://issues.apache.org/jira/browse/BEAM-11325
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Boyuan Zhang
>Priority: P2
>  Labels: stale-P2
>  Time Spent: 7.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (BEAM-10861) Adds URNs and payloads to PubSub transforms

2021-03-05 Thread Boyuan Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-10861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyuan Zhang reassigned BEAM-10861:
---

Assignee: Boyuan Zhang  (was: Kenneth Knowles)

> Adds URNs and payloads to PubSub transforms
> ---
>
> Key: BEAM-10861
> URL: https://issues.apache.org/jira/browse/BEAM-10861
> Project: Beam
>  Issue Type: New Feature
>  Components: cross-language, runner-dataflow, sdk-py-core
>Reporter: Chamikara Madhusanka Jayalath
>Assignee: Boyuan Zhang
>Priority: P1
>  Time Spent: 21h 50m
>  Remaining Estimate: 0h
>
> This is needed to allow runners to override portable definition of transforms.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-11879) SAMPLED_BYTE_SIZE counter should consider recording size per winodw

2021-02-26 Thread Boyuan Zhang (Jira)
Boyuan Zhang created BEAM-11879:
---

 Summary: SAMPLED_BYTE_SIZE counter should consider recording size 
per winodw
 Key: BEAM-11879
 URL: https://issues.apache.org/jira/browse/BEAM-11879
 Project: Beam
  Issue Type: Improvement
  Components: sdk-java-harness
Reporter: Boyuan Zhang


Currently, SAMPLED_BYTE_SIZE records size per WindowedValue. We should consider 
recording size per window when we have window optimization(one WindowedValue 
has multiple windows if possible to reduce metadata and processing overhead).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-11752) Using LoadingCache instead of Map to cache BundleProcessor

2021-02-08 Thread Boyuan Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-11752?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyuan Zhang updated BEAM-11752:

Fix Version/s: 2.29.0
   Resolution: Fixed
   Status: Resolved  (was: Open)

> Using LoadingCache instead of Map to cache BundleProcessor
> --
>
> Key: BEAM-11752
> URL: https://issues.apache.org/jira/browse/BEAM-11752
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: P2
> Fix For: 2.29.0
>
>  Time Spent: 3h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-11776) DoFn lifecycle in programming guide should be updated with portable execution

2021-02-08 Thread Boyuan Zhang (Jira)
Boyuan Zhang created BEAM-11776:
---

 Summary: DoFn lifecycle in programming guide should be updated 
with portable execution
 Key: BEAM-11776
 URL: https://issues.apache.org/jira/browse/BEAM-11776
 Project: Beam
  Issue Type: Improvement
  Components: website
Reporter: Boyuan Zhang


DoFn lifecycle programming guide should be updated to portable execution.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-11752) Using LoadingCache instead of Map to cache BundleProcessor

2021-02-04 Thread Boyuan Zhang (Jira)
Boyuan Zhang created BEAM-11752:
---

 Summary: Using LoadingCache instead of Map to cache BundleProcessor
 Key: BEAM-11752
 URL: https://issues.apache.org/jira/browse/BEAM-11752
 Project: Beam
  Issue Type: Improvement
  Components: sdk-java-harness
Reporter: Boyuan Zhang
Assignee: Boyuan Zhang






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-11734) ReadFromKafkaDoFn should perform seft-checkpoint when there is no offset returned by offsetForTime

2021-02-01 Thread Boyuan Zhang (Jira)
Boyuan Zhang created BEAM-11734:
---

 Summary: ReadFromKafkaDoFn should perform seft-checkpoint when 
there is no offset returned by offsetForTime
 Key: BEAM-11734
 URL: https://issues.apache.org/jira/browse/BEAM-11734
 Project: Beam
  Issue Type: Improvement
  Components: io-java-kafka
Reporter: Boyuan Zhang






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-10120) Support Dynamic Timers in the Flink Portable Runner

2021-01-28 Thread Boyuan Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-10120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyuan Zhang updated BEAM-10120:

Fix Version/s: 2.29.0
   Resolution: Fixed
   Status: Resolved  (was: Triage Needed)

> Support Dynamic Timers in the Flink Portable Runner
> ---
>
> Key: BEAM-10120
> URL: https://issues.apache.org/jira/browse/BEAM-10120
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: P3
> Fix For: 2.29.0
>
>  Time Spent: 5h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-11687) Certain exceptions in SDF wrapper should not fail the bundle directly

2021-01-25 Thread Boyuan Zhang (Jira)
Boyuan Zhang created BEAM-11687:
---

 Summary: Certain exceptions in SDF wrapper should not fail the 
bundle directly
 Key: BEAM-11687
 URL: https://issues.apache.org/jira/browse/BEAM-11687
 Project: Beam
  Issue Type: Improvement
  Components: java-fn-execution
Reporter: Boyuan Zhang






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-11678) beam_PerformanceTests_Kafka_IO is broken by incorrect docker image cleanup

2021-01-25 Thread Boyuan Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-11678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyuan Zhang updated BEAM-11678:

Fix Version/s: 2.28.0
   Resolution: Fixed
   Status: Resolved  (was: Triage Needed)

> beam_PerformanceTests_Kafka_IO is broken by incorrect docker image cleanup
> --
>
> Key: BEAM-11678
> URL: https://issues.apache.org/jira/browse/BEAM-11678
> Project: Beam
>  Issue Type: Improvement
>  Components: test-failures
>Reporter: Boyuan Zhang
>Assignee: Emily Ye
>Priority: P2
> Fix For: 2.28.0
>
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> https://github.com/apache/beam/pull/13211
> https://ci-beam.apache.org/job/beam_PerformanceTests_Kafka_IO/



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-11678) beam_PerformanceTests_Kafka_IO is broken by incorrect docker image cleanup

2021-01-22 Thread Boyuan Zhang (Jira)
Boyuan Zhang created BEAM-11678:
---

 Summary: beam_PerformanceTests_Kafka_IO is broken by incorrect 
docker image cleanup
 Key: BEAM-11678
 URL: https://issues.apache.org/jira/browse/BEAM-11678
 Project: Beam
  Issue Type: Improvement
  Components: test-failures
Reporter: Boyuan Zhang
Assignee: Emily Ye


https://github.com/apache/beam/pull/13211
https://ci-beam.apache.org/job/beam_PerformanceTests_Kafka_IO/



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-11677) Expose commit_offset_in_finalize and timestamp_policy to ReadFromKafka

2021-01-22 Thread Boyuan Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-11677?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyuan Zhang updated BEAM-11677:

Fix Version/s: 2.28.0
   Resolution: Fixed
   Status: Resolved  (was: Open)

> Expose commit_offset_in_finalize and timestamp_policy to ReadFromKafka
> --
>
> Key: BEAM-11677
> URL: https://issues.apache.org/jira/browse/BEAM-11677
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-kafka
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: P2
> Fix For: 2.28.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> See discussion: 
> https://lists.apache.org/thread.html/rece5a93dfd3a26e67440147a46f1ff1f6f7803bd57db22266d46375c%40%3Cdev.beam.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-11677) Expose commit_offset_in_finalize and timestamp_policy to ReadFromKafka

2021-01-22 Thread Boyuan Zhang (Jira)
Boyuan Zhang created BEAM-11677:
---

 Summary: Expose commit_offset_in_finalize and timestamp_policy to 
ReadFromKafka
 Key: BEAM-11677
 URL: https://issues.apache.org/jira/browse/BEAM-11677
 Project: Beam
  Issue Type: Improvement
  Components: io-java-kafka
Reporter: Boyuan Zhang
Assignee: Boyuan Zhang


See discussion: 
https://lists.apache.org/thread.html/rece5a93dfd3a26e67440147a46f1ff1f6f7803bd57db22266d46375c%40%3Cdev.beam.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (BEAM-10120) Support Dynamic Timers in the Flink Portable Runner

2021-01-21 Thread Boyuan Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-10120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyuan Zhang reassigned BEAM-10120:
---

Assignee: Boyuan Zhang

> Support Dynamic Timers in the Flink Portable Runner
> ---
>
> Key: BEAM-10120
> URL: https://issues.apache.org/jira/browse/BEAM-10120
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: P3
>  Time Spent: 3.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (BEAM-11633) Steer people towards ParDo, SDF, instead of the original Source framework

2021-01-12 Thread Boyuan Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-11633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17263826#comment-17263826
 ] 

Boyuan Zhang edited comment on BEAM-11633 at 1/13/21, 1:36 AM:
---

Deprecating source framework is definitely the thing to do. That's also the 
final step we want to take for GA. But before we are able to do that, we need 
to make sure every source works fine on SDF structure, typically in Java 
working with SDF wrapper. Here are some known issues so far:
https://issues.apache.org/jira/browse/BEAM-11403
https://issues.apache.org/jira/browse/BEAM-11328




was (Author: boyuanz):
 Deprecating source framework is definitely the thing to do. That's also the 
final step we want to take for GA. But before we are able to do that, we need 
to make sure every source works fine on SDF structure, typically in Java 
working with SDF wrapper. Here are some known issues so far:
https://issues.apache.org/jira/browse/BEAM-11403
https://issues.apache.org/jira/browse/BEAM-11328
  

> Steer people towards ParDo, SDF, instead of the original Source framework
> -
>
> Key: BEAM-11633
> URL: https://issues.apache.org/jira/browse/BEAM-11633
> Project: Beam
>  Issue Type: Bug
>  Components: website
>Reporter: Ahmet Altay
>Assignee: Boyuan Zhang
>Priority: P2
>
> People still write sources, where 90% of the time they shouldn't. We tell 
> them [not 
> to|https://beam.apache.org/documentation/io/developing-io-overview/], but we 
> should do so more effectively. In particular, the instructions for the ParDo 
> alternative suffer from not being able to name Reshuffle explicitly, when 
> it's exactly what should be used here. It should also mention that the ParDo 
> needs to be seeded by a Create step or similar.
> A big issue here is that Sources are called "Sources". When a new developer 
> is looking to author a pipeline, this is the first place they will look, 
> especially if they're just scanning or searching through documentation. We 
> need to aggressively counteract the gravity of the current naming scheme.
> Suggestion: Improve the documentation mentioned above, and update the Javadoc 
> for BoundedSource, etc., to steer people away from it. If they are part of 
> the small collection of power users who need a source, they'll be okay.
> Suggestions for future work:
> - Consider deprecating source framework in favor of SDF.
> - Point to SDF docs (and simplify SDF docs)
> - Also many users can simply just use FileIO.matchAll followed by a ParDo. 
> Recommend those types of alternatives.
> Assigning this to [~boyuanz] anyone could help here.
> /cc [~kenn][~chamikara][~reuvenlax][~robertwb][~rtnguyen]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-11633) Steer people towards ParDo, SDF, instead of the original Source framework

2021-01-12 Thread Boyuan Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-11633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17263826#comment-17263826
 ] 

Boyuan Zhang commented on BEAM-11633:
-

 Deprecating source framework is definitely the thing to do. That's also the 
final step we want to take for GA. But before we are able to do that, we need 
to make sure every source works fine on SDF structure, typically in Java 
working with SDF wrapper. Here are some known issues so far:
https://issues.apache.org/jira/browse/BEAM-11403
https://issues.apache.org/jira/browse/BEAM-11328
  

> Steer people towards ParDo, SDF, instead of the original Source framework
> -
>
> Key: BEAM-11633
> URL: https://issues.apache.org/jira/browse/BEAM-11633
> Project: Beam
>  Issue Type: Bug
>  Components: website
>Reporter: Ahmet Altay
>Assignee: Boyuan Zhang
>Priority: P2
>
> People still write sources, where 90% of the time they shouldn't. We tell 
> them [not 
> to|https://beam.apache.org/documentation/io/developing-io-overview/], but we 
> should do so more effectively. In particular, the instructions for the ParDo 
> alternative suffer from not being able to name Reshuffle explicitly, when 
> it's exactly what should be used here. It should also mention that the ParDo 
> needs to be seeded by a Create step or similar.
> A big issue here is that Sources are called "Sources". When a new developer 
> is looking to author a pipeline, this is the first place they will look, 
> especially if they're just scanning or searching through documentation. We 
> need to aggressively counteract the gravity of the current naming scheme.
> Suggestion: Improve the documentation mentioned above, and update the Javadoc 
> for BoundedSource, etc., to steer people away from it. If they are part of 
> the small collection of power users who need a source, they'll be okay.
> Suggestions for future work:
> - Consider deprecating source framework in favor of SDF.
> - Point to SDF docs (and simplify SDF docs)
> - Also many users can simply just use FileIO.matchAll followed by a ParDo. 
> Recommend those types of alternatives.
> Assigning this to [~boyuanz] anyone could help here.
> /cc [~kenn][~chamikara][~reuvenlax][~robertwb][~rtnguyen]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-11403) Unbounded SDF wrapper causes performance regression on DirectRunner

2021-01-12 Thread Boyuan Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-11403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17263797#comment-17263797
 ] 

Boyuan Zhang commented on BEAM-11403:
-

Proposed to make the runner-issued checkpoint frequency configurable: 
https://docs.google.com/document/d/18jNLtTyyApx0N2ytp1ytOMmUPLouj2h08N3-4SyWGgQ/edit?usp=sharing

> Unbounded SDF wrapper causes performance regression on DirectRunner
> ---
>
> Key: BEAM-11403
> URL: https://issues.apache.org/jira/browse/BEAM-11403
> Project: Beam
>  Issue Type: Bug
>  Components: runner-direct, sdk-java-core
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: P2
>  Time Spent: 4.5h
>  Remaining Estimate: 0h
>
> There is a significant performance regression when switching from 
> UnboundedSource to Unbounded SDF wrapper. So far there are 2 IOs reported:
> * Pubsub Read: 
> https://lists.apache.org/thread.html/re6b0941a8b4951293a0327ce9b25e607cafd6e45b69783f65290edee%40%3Cdev.beam.apache.org%3E
> * Kafka Read: https://the-asf.slack.com/archives/C9H0YNP3P/p1606155042346600



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (BEAM-11325) KafkaIO should be able to read from new added topic/partition automatically during pipeline execution time

2021-01-07 Thread Boyuan Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-11325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17260976#comment-17260976
 ] 

Boyuan Zhang edited comment on BEAM-11325 at 1/8/21, 3:57 AM:
--

Started the design: 
https://docs.google.com/document/d/1FU3GxVRetHPLVizP3Mdv6mP5tpjZ3fd99qNjUI5DT5k/edit?usp=sharing


was (Author: boyuanz):
Started the design: 
https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java#L265

> KafkaIO should be able to read from new added topic/partition automatically 
> during pipeline execution time
> --
>
> Key: BEAM-11325
> URL: https://issues.apache.org/jira/browse/BEAM-11325
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Boyuan Zhang
>Priority: P2
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-11325) KafkaIO should be able to read from new added topic/partition automatically during pipeline execution time

2021-01-07 Thread Boyuan Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-11325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17260976#comment-17260976
 ] 

Boyuan Zhang commented on BEAM-11325:
-

Started the design: 
https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java#L265

> KafkaIO should be able to read from new added topic/partition automatically 
> during pipeline execution time
> --
>
> Key: BEAM-11325
> URL: https://issues.apache.org/jira/browse/BEAM-11325
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Boyuan Zhang
>Priority: P2
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-11132) Remove Experiment annotation from SDF API

2021-01-07 Thread Boyuan Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-11132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyuan Zhang updated BEAM-11132:

Fix Version/s: 2.26.0
   Resolution: Fixed
   Status: Resolved  (was: Open)

> Remove Experiment annotation from SDF API
> -
>
> Key: BEAM-11132
> URL: https://issues.apache.org/jira/browse/BEAM-11132
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: P2
>  Labels: Clarified, Done, stale-assigned
> Fix For: 2.26.0
>
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-11537) PR13598 breaks Java Post Commit

2020-12-29 Thread Boyuan Zhang (Jira)
Boyuan Zhang created BEAM-11537:
---

 Summary: PR13598 breaks Java Post Commit
 Key: BEAM-11537
 URL: https://issues.apache.org/jira/browse/BEAM-11537
 Project: Beam
  Issue Type: Test
  Components: test-failures
Reporter: Boyuan Zhang
Assignee: Brian Hulette


https://ci-beam.apache.org/job/beam_PostCommit_Java/6997/



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-11524) (Deprecated) WriteStringsToPubSub broken on Dataflow

2020-12-24 Thread Boyuan Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-11524?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17254654#comment-17254654
 ] 

Boyuan Zhang commented on BEAM-11524:
-

Yes. Python doesn't offer custom pubsub soure/sink.

I opened https://github.com/apache/beam/pull/13614 to fix WriteStringsToPubSub

> (Deprecated) WriteStringsToPubSub broken on Dataflow
> 
>
> Key: BEAM-11524
> URL: https://issues.apache.org/jira/browse/BEAM-11524
> Project: Beam
>  Issue Type: Bug
>  Components: io-py-gcp, sdk-py-core
>Affects Versions: 2.26.0
>Reporter: Brian Hulette
>Assignee: Brian Hulette
>Priority: P1
> Fix For: 2.27.0
>
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> Actions to address this:
> - Fix WriteStringsToPubSub
> - Replace usage of WriteStringsToPubSub in sql_taxi example (and any other 
> examples)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-11403) Unbounded SDF wrapper causes performance regression on DirectRunner

2020-12-21 Thread Boyuan Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-11403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17253078#comment-17253078
 ] 

Boyuan Zhang commented on BEAM-11403:
-

Discussion thread: 
https://lists.apache.org/thread.html/re6b0941a8b4951293a0327ce9b25e607cafd6e45b69783f65290edee%40%3Cdev.beam.apache.org%3E


> Unbounded SDF wrapper causes performance regression on DirectRunner
> ---
>
> Key: BEAM-11403
> URL: https://issues.apache.org/jira/browse/BEAM-11403
> Project: Beam
>  Issue Type: Bug
>  Components: runner-direct, sdk-java-core
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: P2
>
> There is a significant performance regression when switching from 
> UnboundedSource to Unbounded SDF wrapper. So far there are 2 IOs reported:
> * Pubsub Read: 
> https://lists.apache.org/thread.html/re6b0941a8b4951293a0327ce9b25e607cafd6e45b69783f65290edee%40%3Cdev.beam.apache.org%3E
> * Kafka Read: https://the-asf.slack.com/archives/C9H0YNP3P/p1606155042346600



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (BEAM-11403) Unbounded SDF wrapper causes performance regression on DirectRunner

2020-12-21 Thread Boyuan Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-11403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyuan Zhang reassigned BEAM-11403:
---

Assignee: Boyuan Zhang

> Unbounded SDF wrapper causes performance regression on DirectRunner
> ---
>
> Key: BEAM-11403
> URL: https://issues.apache.org/jira/browse/BEAM-11403
> Project: Beam
>  Issue Type: Bug
>  Components: runner-direct, sdk-java-core
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: P2
>
> There is a significant performance regression when switching from 
> UnboundedSource to Unbounded SDF wrapper. So far there are 2 IOs reported:
> * Pubsub Read: 
> https://lists.apache.org/thread.html/re6b0941a8b4951293a0327ce9b25e607cafd6e45b69783f65290edee%40%3Cdev.beam.apache.org%3E
> * Kafka Read: https://the-asf.slack.com/archives/C9H0YNP3P/p1606155042346600



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   3   4   5   >