[ 
https://issues.apache.org/jira/browse/BEAM-12494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364513#comment-17364513
 ] 

Jasminder pal singh sehgal commented on BEAM-12494:
---------------------------------------------------

!image-2021-06-16-16-54-25-161.png!

> Dataflow Kafka Job not triggering for external subnet
> -----------------------------------------------------
>
>                 Key: BEAM-12494
>                 URL: https://issues.apache.org/jira/browse/BEAM-12494
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-kafka
>    Affects Versions: 2.28.0
>         Environment: IntelliJ community version, Maven, Windows, Dataflow 
> version 2.28.0
>            Reporter: Jasminder pal singh sehgal
>            Priority: P2
>             Fix For: Not applicable
>
>         Attachments: CodeSnippet.JPG, SuccessfulJobRun-KafkaIngestion.txt, 
> TimeOutLogs_KafkaIngestion.txt, image-2021-06-16-16-54-25-161.png
>
>
> Hello,
> Our team is facing an issue in streaming the Dataflow Kafka job through 
> IntelliJ that is hosted on a private subnet. 
> The hypothesis is that during Graph Construction time [0], the beam locally 
> tries to execute the code and check all the connections. In our case, we 
> don't have access to subnet through IntelliJ or through the Cloud console. We 
> do have access when compute engine instance is created within that subnet.
> We reached out to Google support and they suggested us to raise a defect with 
> u. The following code throws *time-out* error when we execute through 
> IntelliJ.
> {code:java}
> pipeline.apply("Read Kafka", KafkaIO.<String, String>read()
>         .withConsumerConfigUpdates(propertyBuilder)
>         .withConsumerConfigUpdates(
>                 ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, 
> "my-consumer-group")
>         )
>         .withBootstrapServers(options.getBootstrapServers())
>         .withTopics(topicsList)
>         .withKeyDeserializer(StringDeserializer.class)
>         .withValueDeserializer(StringDeserializer.class)
>         .commitOffsetsInFinalize()
>        // .withMaxNumRecords(5)
> )
> {code}
> But, if we uncomment
> {code:java}
> .withMaxNumRecords(){code}
> The code works perfectly and we are able to spin up dataflow job in the 
> desired subnet to ingest the Kafka stream.
> {code:java}
> pipeline.apply("Read Kafka", KafkaIO.<String, String>read()
>         .withConsumerConfigUpdates(propertyBuilder)
>         .withConsumerConfigUpdates(
>                 ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, 
> "my-consumer-group")
>         )
>         .withBootstrapServers(options.getBootstrapServers())
>         .withTopics(topicsList)
>         .withKeyDeserializer(StringDeserializer.class)
>         .withValueDeserializer(StringDeserializer.class)
>         .commitOffsetsInFinalize()
>         .withMaxNumRecords(5)
> )
> {code}
> The issue with the above Code is that the Dataflow will stop after ingesting 
> the given number of records and will act like Batch ingestion, instead of 
> Streaming, which we don't want.
> *Google support team hypothesis:*
> Current hypothesis is that the issue is happening in 
> `KakfaUnboundedSource.split()` [1] which fails due to unable to get Topic 
> information.
> The first point is, `withMaxNumRecords` is used for testing [2] and when 
> specified, the unbounded nature of the pipeline is converted into bounded 
> read in `BoundedReadFromUnboundedSource` [3] but without the 
> `withMaxNumRecords` the pipeline is still unbounded.
> When the pipeline is Bounded (when mentioning withMaxNumRecords) the 
> `split()` happens in Dataflow worker in `SplitFn` [4]. Since, it ran on 
> Dataflow, it did not have issue connecting to Kafka.
> But, when the pipeline is Unbounded (withMaxNumRecords commented out) the 
> `split()` is called when the pipeline is built locally at graph construction 
> phase [5][6] at which point it does not have access to Kafka.
>  
> [0]
>   
> [https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#pipeline-lifecycle:-from-pipeline-code-to-dataflow-job]
>   [1]
>  
> [https://github.com/apache/beam/blob/v2.28.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java#L57]
>  [2] 
> [https://beam.apache.org/releases/javadoc/2.28.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withMaxNumRecords-long-]
>  [3] 
> [https://github.com/apache/beam/blob/v2.28.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L191-L193]
>  [4] 
> [https://github.com/apache/beam/blob/v2.28.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java#L168-L169]
>  [5] 
> [https://github.com/apache/beam/blob/v2.28.0/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java#L87]
>  [6] 
> [https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#pipeline-lifecycle:-from-pipeline-code-to-dataflow-job]
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to