[ https://issues.apache.org/jira/browse/BEAM-12494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17373803#comment-17373803 ]
Boyuan Zhang commented on BEAM-12494: ------------------------------------- I believe this is an intended behavior on runner v1. If you want to bypass this issue, there are 2 options: * Use runner_v2 on Dataflow instead. In order to do so, please reach out to the Dataflow Customer Support. * Use withTopicPartitions() instead of withTopic to read from Kafka. > Dataflow Kafka Job not triggering for external subnet > ----------------------------------------------------- > > Key: BEAM-12494 > URL: https://issues.apache.org/jira/browse/BEAM-12494 > Project: Beam > Issue Type: Bug > Components: io-java-kafka > Affects Versions: 2.28.0 > Environment: IntelliJ community version, Maven, Windows, Dataflow > version 2.28.0 > Reporter: Jasminder pal singh sehgal > Priority: P2 > Fix For: Not applicable > > Attachments: CodeSnippet.JPG, LogsStreamingEngine.txt, > SuccessfulJobRun-KafkaIngestion.txt, TimeOutLogs_KafkaIngestion.txt, > image-2021-06-16-16-54-25-161.png, image-2021-06-16-16-55-57-509.png, > image-2021-06-20-22-20-24-363.png, image-2021-06-20-22-23-14-052.png, > image-2021-06-21-15-00-09-851.png > > > Hello, > Our team is facing an issue in streaming the Dataflow Kafka job through > IntelliJ that is hosted on a private subnet. > The hypothesis is that during Graph Construction time [0], the beam locally > tries to execute the code and check all the connections. In our case, we > don't have access to subnet through IntelliJ or through the Cloud console. We > do have access when compute engine instance is created within that subnet. > We reached out to Google support and they suggested us to raise a defect with > u. The following code throws *time-out* error when we execute through > IntelliJ. > {code:java} > pipeline.apply("Read Kafka", KafkaIO.<String, String>read() > .withConsumerConfigUpdates(propertyBuilder) > .withConsumerConfigUpdates( > ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, > "my-consumer-group") > ) > .withBootstrapServers(options.getBootstrapServers()) > .withTopics(topicsList) > .withKeyDeserializer(StringDeserializer.class) > .withValueDeserializer(StringDeserializer.class) > .commitOffsetsInFinalize() > // .withMaxNumRecords(5) > ) > {code} > But, if we uncomment > {code:java} > .withMaxNumRecords(){code} > The code works perfectly and we are able to spin up dataflow job in the > desired subnet to ingest the Kafka stream. > {code:java} > pipeline.apply("Read Kafka", KafkaIO.<String, String>read() > .withConsumerConfigUpdates(propertyBuilder) > .withConsumerConfigUpdates( > ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, > "my-consumer-group") > ) > .withBootstrapServers(options.getBootstrapServers()) > .withTopics(topicsList) > .withKeyDeserializer(StringDeserializer.class) > .withValueDeserializer(StringDeserializer.class) > .commitOffsetsInFinalize() > .withMaxNumRecords(5) > ) > {code} > The issue with the above Code is that the Dataflow will stop after ingesting > the given number of records and will act like Batch ingestion, instead of > Streaming, which we don't want. > *Google support team hypothesis:* > Current hypothesis is that the issue is happening in > `KakfaUnboundedSource.split()` [1] which fails due to unable to get Topic > information. > The first point is, `withMaxNumRecords` is used for testing [2] and when > specified, the unbounded nature of the pipeline is converted into bounded > read in `BoundedReadFromUnboundedSource` [3] but without the > `withMaxNumRecords` the pipeline is still unbounded. > When the pipeline is Bounded (when mentioning withMaxNumRecords) the > `split()` happens in Dataflow worker in `SplitFn` [4]. Since, it ran on > Dataflow, it did not have issue connecting to Kafka. > But, when the pipeline is Unbounded (withMaxNumRecords commented out) the > `split()` is called when the pipeline is built locally at graph construction > phase [5][6] at which point it does not have access to Kafka. > > [0] > > [https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#pipeline-lifecycle:-from-pipeline-code-to-dataflow-job] > [1] > > [https://github.com/apache/beam/blob/v2.28.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java#L57] > [2] > [https://beam.apache.org/releases/javadoc/2.28.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withMaxNumRecords-long-] > [3] > [https://github.com/apache/beam/blob/v2.28.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L191-L193] > [4] > [https://github.com/apache/beam/blob/v2.28.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java#L168-L169] > [5] > [https://github.com/apache/beam/blob/v2.28.0/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java#L87] > [6] > [https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#pipeline-lifecycle:-from-pipeline-code-to-dataflow-job] > -- This message was sent by Atlassian Jira (v8.3.4#803005)