soumilshah1995 opened a new issue, #11417:
URL: https://github.com/apache/hudi/issues/11417

   When running the Hoodie DeltaStreamer with two writers simultaneously, one 
for the US partition and the other for the IN partition, one of the writers 
fails with a NullPointerException. This issue occurs during the offset fetching 
process from Kafka.
   
   
![image](https://github.com/apache/hudi/assets/39345855/9997c228-ff87-4650-9e9b-55e8bf215ce0)
   
   # Steps 
   ### spin up stack 
   ```
   version: "3"
   
   services:
     trino-coordinator:
       image: 'trinodb/trino:400'
       hostname: trino-coordinator
       ports:
         - '8080:8080'
       volumes:
         - ./trino/etc:/etc/trino
   
     metastore_db:
       image: postgres:11
       hostname: metastore_db
       ports:
         - 5432:5432
       environment:
         POSTGRES_USER: hive
         POSTGRES_PASSWORD: hive
         POSTGRES_DB: metastore
   
     hive-metastore:
       hostname: hive-metastore
       image: 'starburstdata/hive:3.1.2-e.18'
       ports:
         - '9083:9083' # Metastore Thrift
       environment:
         HIVE_METASTORE_DRIVER: org.postgresql.Driver
         HIVE_METASTORE_JDBC_URL: jdbc:postgresql://metastore_db:5432/metastore
         HIVE_METASTORE_USER: hive
         HIVE_METASTORE_PASSWORD: hive
         HIVE_METASTORE_WAREHOUSE_DIR: s3://datalake/
         S3_ENDPOINT: http://minio:9000
         S3_ACCESS_KEY: admin
         S3_SECRET_KEY: password
         S3_PATH_STYLE_ACCESS: "true"
         REGION: ""
         GOOGLE_CLOUD_KEY_FILE_PATH: ""
         AZURE_ADL_CLIENT_ID: ""
         AZURE_ADL_CREDENTIAL: ""
         AZURE_ADL_REFRESH_URL: ""
         AZURE_ABFS_STORAGE_ACCOUNT: ""
         AZURE_ABFS_ACCESS_KEY: ""
         AZURE_WASB_STORAGE_ACCOUNT: ""
         AZURE_ABFS_OAUTH: ""
         AZURE_ABFS_OAUTH_TOKEN_PROVIDER: ""
         AZURE_ABFS_OAUTH_CLIENT_ID: ""
         AZURE_ABFS_OAUTH_SECRET: ""
         AZURE_ABFS_OAUTH_ENDPOINT: ""
         AZURE_WASB_ACCESS_KEY: ""
         HIVE_METASTORE_USERS_IN_ADMIN_ROLE: "admin"
       depends_on:
         - metastore_db
       healthcheck:
         test: bash -c "exec 6<> /dev/tcp/localhost/9083"
   
   
     fast-data-dev:
       image: dougdonohoe/fast-data-dev
       ports:
         - "3181:3181"
         - "3040:3040"
         - "7081:7081"
         - "7082:7082"
         - "7083:7083"
         - "7092:7092"
         - "8081:8081"
       environment:
         - ZK_PORT=3181
         - WEB_PORT=3040
         - REGISTRY_PORT=8081
         - REST_PORT=7082
         - CONNECT_PORT=7083
         - BROKER_PORT=7092
         - ADV_HOST=127.0.0.1
   
   volumes:
     hive-metastore-postgresql:
   
   networks:
     default:
       name: hudi
   ```
   
   # publish some data 
   ```
   from faker import Faker
   from time import sleep
   import random
   import uuid
   from datetime import datetime
   from kafka_schema_registry import prepare_producer
   
   # Configuration
   KAFKA_BOOTSTRAP_SERVERS = ['localhost:7092']
   SCHEMA_REGISTRY_URL = 'http://localhost:8081'
   NUM_MESSAGES = 20
   SLEEP_INTERVAL = 1
   TOPIC_NAME = 'orders'
   NUM_PARTITIONS = 1
   REPLICATION_FACTOR = 1
   
   # Avro Schema
   SAMPLE_SCHEMA = {
       "type": "record",
       "name": "Order",
       "fields": [
           {"name": "order_id", "type": "string"},
           {"name": "name", "type": "string"},
           {"name": "order_value", "type": "string"},
           {"name": "priority", "type": "string"},
           {"name": "order_date", "type": "string"},
           {"name": "customer_id", "type": "string"},
           {"name": "ts", "type": "string"},
           {"name": "country", "type": "string"}
       ]
   }
   
   # Kafka Producer
   producer = prepare_producer(
       KAFKA_BOOTSTRAP_SERVERS,
       SCHEMA_REGISTRY_URL,
       TOPIC_NAME,
       NUM_PARTITIONS,
       REPLICATION_FACTOR,
       value_schema=SAMPLE_SCHEMA
   )
   
   # Faker instance
   faker = Faker()
   
   
   class DataGenerator:
       @staticmethod
       def get_orders_data():
           """
           Generate and return a dictionary with mock order data.
           """
           country = random.choice(['US', 'IN'])  # Define country variable
   
           return {
               "order_id": str(uuid.uuid4()),
               "name": faker.text(max_nb_chars=20),
               "order_value": str(random.randint(10, 1000)),
               "priority": random.choice(["LOW", "MEDIUM", "HIGH"]),
               "order_date": faker.date_between(start_date='-30d', 
end_date='today').strftime('%Y-%m-%d'),
               "customer_id": str(uuid.uuid4()),
               "ts": str(datetime.now().timestamp()),
               "country": country
           }
   
       @staticmethod
       def produce_avro_message(producer, data):
           """
           Produce an Avro message and send it to the appropriate Kafka topic 
based on the country.
           """
           topic = 'orders_in' if data['country'] == 'IN' else 'orders_us'
           producer.send(topic, data)
   
   
   # Generate and send order data
   for _ in range(NUM_MESSAGES):
       order_data = DataGenerator.get_orders_data()
       print(order_data, type(order_data))
       DataGenerator.produce_avro_message(producer, order_data)
       print("Order Payload:", order_data)
       sleep(SLEEP_INTERVAL)
   
   ```
   
   
   # Job 1
   ```
   
   spark-submit \
       --class org.apache.hudi.utilities.streamer.HoodieStreamer \
       --packages 
'org.apache.hudi:hudi-spark3.4-bundle_2.12:0.14.0,org.apache.hadoop:hadoop-aws:3.3.2'
 \
       --properties-file spark-config.properties \
       --master 'local[*]' \
       --executor-memory 1g \
       
/Users/soumilshah/IdeaProjects/SparkProject/apache-hudi-delta-streamer-labs/E11/jar/hudi-utilities-slim-bundle_2.12-0.14.0.jar
 \
       --table-type COPY_ON_WRITE \
       --op UPSERT \
       --min-sync-interval-seconds 60 \
       --continuous \
       --source-ordering-field ts \
       --source-class org.apache.hudi.utilities.sources.AvroKafkaSource \
       --target-base-path 
'file:////Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders'
  \
       --target-table orders \
       --schemaprovider-class 
org.apache.hudi.utilities.schema.SchemaRegistryProvider \
       --transformer-class 
'org.apache.hudi.utilities.transform.SqlFileBasedTransformer' \
       --props tbl_hudi_us.props
   
   ```
   
   tbl_hudi_us.props
   ```
   hoodie.datasource.write.recordkey.field=order_id
   hoodie.datasource.write.partitionpath.field=country
   hoodie.datasource.write.precombine.field=ts
   
   # Kafka Prop
   bootstrap.servers=localhost:7092
   auto.offset.reset=earliest
   hoodie.deltastreamer.source.kafka.topic=orders_us
   
hoodie.deltastreamer.source.kafka.value.deserializer.class=org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer
   schema.registry.url=http://localhost:8081/
   hoodie.deltastreamer.schemaprovider.registry.schemaconverter=
   
hoodie.deltastreamer.schemaprovider.registry.url=http://localhost:8081/subjects/orders-value/versions/latest
   
   # Hive Sync
   
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor
   hoodie.datasource.hive_sync.metastore.uris=thrift://localhost:9083
   hoodie.datasource.hive_sync.mode=hms
   hoodie.datasource.hive_sync.enable=true
   hoodie.datasource.hive_sync.database=default
   hoodie.datasource.hive_sync.table=orders
   hoodie.datasource.write.hive_style_partitioning=true
   
   # Locks Providers
   hoodie.write.concurrency.mode=optimistic_concurrency_control
   hoodie.cleaner.policy.failed.writes=LAZY
   
hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider
   
   
hoodie.streamer.transformer.sql.file=/Users/soumilshah/IdeaProjects/SparkProject/deltastreamerBroadcastJoins/us.sql
   ```
   
   us.sql
   ```
   SELECT
       *
   FROM
           <SRC> a
   ```
   
   
   
   # job 2 
   ```
   spark-submit \
       --class org.apache.hudi.utilities.streamer.HoodieStreamer \
       --packages 
'org.apache.hudi:hudi-spark3.4-bundle_2.12:0.14.0,org.apache.hadoop:hadoop-aws:3.3.2'
 \
       --properties-file spark-config.properties \
       --master 'local[*]' \
       --executor-memory 1g \
       
/Users/soumilshah/IdeaProjects/SparkProject/apache-hudi-delta-streamer-labs/E11/jar/hudi-utilities-slim-bundle_2.12-0.14.0.jar
 \
       --table-type COPY_ON_WRITE \
       --op UPSERT \
       --min-sync-interval-seconds 60 \
       --continuous \
       --source-ordering-field ts \
       --source-class org.apache.hudi.utilities.sources.AvroKafkaSource \
       --target-base-path 
'file:////Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders'
  \
       --target-table orders \
       --schemaprovider-class 
org.apache.hudi.utilities.schema.SchemaRegistryProvider \
       --transformer-class 
'org.apache.hudi.utilities.transform.SqlFileBasedTransformer' \
       --props tbl_hudi_in.props
   ```
   
   tbl_hudi_in.props
   ```
   
   hoodie.datasource.write.recordkey.field=order_id
   hoodie.datasource.write.partitionpath.field=country
   hoodie.datasource.write.precombine.field=ts
   
   # Kafka Prop
   bootstrap.servers=localhost:7092
   auto.offset.reset=earliest
   hoodie.deltastreamer.source.kafka.topic=orders_in
   
hoodie.deltastreamer.source.kafka.value.deserializer.class=org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer
   schema.registry.url=http://localhost:8081/
   hoodie.deltastreamer.schemaprovider.registry.schemaconverter=
   
hoodie.deltastreamer.schemaprovider.registry.url=http://localhost:8081/subjects/orders-value/versions/latest
   
   # Hive Sync
   
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor
   hoodie.datasource.hive_sync.metastore.uris=thrift://localhost:9083
   hoodie.datasource.hive_sync.mode=hms
   hoodie.datasource.hive_sync.enable=true
   hoodie.datasource.hive_sync.database=default
   hoodie.datasource.hive_sync.table=orders
   hoodie.datasource.write.hive_style_partitioning=true
   
   # Locks Providers
   hoodie.write.concurrency.mode=optimistic_concurrency_control
   hoodie.cleaner.policy.failed.writes=LAZY
   
hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider
   
   
hoodie.streamer.transformer.sql.file=/Users/soumilshah/IdeaProjects/SparkProject/deltastreamerBroadcastJoins/in.sql
   ```
   
   in.sql
   ```
   SELECT
       *
   FROM
           <SRC> a
   ```
   
   spark-config.properties
   ```
   spark.serializer=org.apache.spark.serializer.KryoSerializer
   
spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog
   spark.sql.hive.convertMetastoreParquet=false
   ```
   
   ### one of writer fails 
   ```
   name=orders
   24/06/08 09:58:02 INFO HoodieTableConfig: Loading table properties from 
file:/Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/.hoodie/hoodie.properties
   24/06/08 09:58:02 INFO HoodieTableMetaClient: Finished Loading Table of type 
COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from 
file:////Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders
   24/06/08 09:58:02 INFO HoodieActiveTimeline: Loaded instants upto : 
Option{val=[20240608095731003__commit__COMPLETED__20240608095736885]}
   24/06/08 09:58:02 INFO StreamSync: Checkpoint to resume from : 
Option{val=orders_in,0:8}
   24/06/08 09:58:02 INFO ConsumerConfig: ConsumerConfig values: 
        allow.auto.create.topics = true
        auto.commit.interval.ms = 5000
        auto.offset.reset = earliest
        bootstrap.servers = [localhost:7092]
        check.crcs = true
        client.dns.lookup = use_all_dns_ips
        client.id = consumer-null-1
        client.rack = 
        connections.max.idle.ms = 540000
        default.api.timeout.ms = 60000
        enable.auto.commit = true
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = null
        group.instance.id = null
        heartbeat.interval.ms = 3000
        interceptor.classes = []
        internal.leave.group.on.close = true
        internal.throw.on.fetch.stable.offset.unsupported = false
        isolation.level = read_uncommitted
        key.deserializer = class 
org.apache.kafka.common.serialization.StringDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 500
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class 
org.apache.kafka.clients.consumer.RangeAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        security.providers = null
        send.buffer.bytes = 131072
        session.timeout.ms = 10000
        socket.connection.setup.timeout.max.ms = 30000
        socket.connection.setup.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
        ssl.endpoint.identification.algorithm = https
        ssl.engine.factory.class = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.certificate.chain = null
        ssl.keystore.key = null
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLSv1.3
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.certificates = null
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class 
org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer
   
   24/06/08 09:58:02 INFO KafkaAvroDeserializerConfig: 
KafkaAvroDeserializerConfig values: 
        bearer.auth.token = [hidden]
        schema.registry.url = [http://localhost:8081/]
        basic.auth.user.info = [hidden]
        auto.register.schemas = true
        max.schemas.per.subject = 1000
        basic.auth.credentials.source = URL
        schema.registry.basic.auth.user.info = [hidden]
        bearer.auth.credentials.source = STATIC_TOKEN
        specific.avro.reader = false
        value.subject.name.strategy = class 
io.confluent.kafka.serializers.subject.TopicNameStrategy
        key.subject.name.strategy = class 
io.confluent.kafka.serializers.subject.TopicNameStrategy
   
   24/06/08 09:58:02 WARN ConsumerConfig: The configuration 
'hoodie.deltastreamer.source.kafka.value.deserializer.class' was supplied but 
isn't a known config.
   24/06/08 09:58:02 WARN ConsumerConfig: The configuration 
'hoodie.streamer.source.kafka.value.deserializer.schema' was supplied but isn't 
a known config.
   24/06/08 09:58:02 INFO AppInfoParser: Kafka version: 2.8.0
   24/06/08 09:58:02 INFO AppInfoParser: Kafka commitId: ebb1d6e21cc92130
   24/06/08 09:58:02 INFO AppInfoParser: Kafka startTimeMs: 1717855082275
   24/06/08 09:58:02 INFO Metadata: [Consumer clientId=consumer-null-1, 
groupId=null] Cluster ID: Vd3KoCf_Qbu1gx33_kRL3g
   24/06/08 09:58:02 INFO Metrics: Metrics scheduler closed
   24/06/08 09:58:02 INFO Metrics: Closing reporter 
org.apache.kafka.common.metrics.JmxReporter
   24/06/08 09:58:02 INFO Metrics: Metrics reporters closed
   24/06/08 09:58:02 INFO AppInfoParser: App info kafka.consumer for 
consumer-null-1 unregistered
   24/06/08 09:58:02 ERROR HoodieStreamer: Shutting down delta-sync due to 
exception
   java.lang.NullPointerException
        at 
org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.lambda$fetchValidOffsets$1(KafkaOffsetGen.java:340)
        at 
java.base/java.util.stream.MatchOps$1MatchSink.accept(MatchOps.java:90)
        at 
java.base/java.util.HashMap$EntrySpliterator.tryAdvance(HashMap.java:1785)
        at 
java.base/java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:127)
        at 
java.base/java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:502)
        at 
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:488)
        at 
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
        at 
java.base/java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:230)
        at 
java.base/java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:196)
        at 
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at 
java.base/java.util.stream.ReferencePipeline.anyMatch(ReferencePipeline.java:528)
        at 
org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.fetchValidOffsets(KafkaOffsetGen.java:340)
        at 
org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.getNextOffsetRanges(KafkaOffsetGen.java:255)
        at 
org.apache.hudi.utilities.sources.KafkaSource.fetchNewData(KafkaSource.java:63)
        at org.apache.hudi.utilities.sources.Source.fetchNext(Source.java:76)
        at 
org.apache.hudi.utilities.streamer.SourceFormatAdapter.fetchNewDataInRowFormat(SourceFormatAdapter.java:228)
        at 
org.apache.hudi.utilities.streamer.StreamSync.fetchFromSource(StreamSync.java:527)
        at 
org.apache.hudi.utilities.streamer.StreamSync.readFromSource(StreamSync.java:495)
        at 
org.apache.hudi.utilities.streamer.StreamSync.syncOnce(StreamSync.java:405)
        at 
org.apache.hudi.utilities.streamer.HoodieStreamer$StreamSyncService.lambda$startService$1(HoodieStreamer.java:757)
        at 
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
   24/06/08 09:58:02 INFO HoodieStreamer: Delta Sync shutdown. Error ?true
   24/06/08 09:58:02 INFO HoodieStreamer: Ingestion completed. Has error: true
   24/06/08 09:58:02 INFO StreamSync: Shutting down embedded timeline server
   24/06/08 09:58:02 ERROR HoodieAsyncService: Service shutdown with error
   java.util.concurrent.ExecutionException: 
org.apache.hudi.exception.HoodieException
        at 
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
        at 
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2005)
        at 
org.apache.hudi.async.HoodieAsyncService.waitForShutdown(HoodieAsyncService.java:103)
        at 
org.apache.hudi.utilities.ingestion.HoodieIngestionService.startIngestion(HoodieIngestionService.java:65)
        at org.apache.hudi.common.util.Option.ifPresent(Option.java:97)
        at 
org.apache.hudi.utilities.streamer.HoodieStreamer.sync(HoodieStreamer.java:205)
        at 
org.apache.hudi.utilities.streamer.HoodieStreamer.main(HoodieStreamer.java:584)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at 
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at 
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1020)
        at 
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:192)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:215)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
        at 
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1111)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1120)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
   Caused by: org.apache.hudi.exception.HoodieException
        at 
org.apache.hudi.utilities.streamer.HoodieStreamer$StreamSyncService.lambda$startService$1(HoodieStreamer.java:796)
        at 
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
   Caused by: java.lang.NullPointerException
        at 
org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.lambda$fetchValidOffsets$1(KafkaOffsetGen.java:340)
        at 
java.base/java.util.stream.MatchOps$1MatchSink.accept(MatchOps.java:90)
        at 
java.base/java.util.HashMap$EntrySpliterator.tryAdvance(HashMap.java:1785)
        at 
java.base/java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:127)
        at 
java.base/java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:502)
        at 
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:488)
        at 
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
        at 
java.base/java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:230)
        at 
java.base/java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:196)
        at 
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at 
java.base/java.util.stream.ReferencePipeline.anyMatch(ReferencePipeline.java:528)
        at 
org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.fetchValidOffsets(KafkaOffsetGen.java:340)
        at 
org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.getNextOffsetRanges(KafkaOffsetGen.java:255)
        at 
org.apache.hudi.utilities.sources.KafkaSource.fetchNewData(KafkaSource.java:63)
        at org.apache.hudi.utilities.sources.Source.fetchNext(Source.java:76)
        at 
org.apache.hudi.utilities.streamer.SourceFormatAdapter.fetchNewDataInRowFormat(SourceFormatAdapter.java:228)
        at 
org.apache.hudi.utilities.streamer.StreamSync.fetchFromSource(StreamSync.java:527)
        at 
org.apache.hudi.utilities.streamer.StreamSync.readFromSource(StreamSync.java:495)
        at 
org.apache.hudi.utilities.streamer.StreamSync.syncOnce(StreamSync.java:405)
        at 
org.apache.hudi.utilities.streamer.HoodieStreamer$StreamSyncService.lambda$startService$1(HoodieStreamer.java:757)
        ... 4 more
   24/06/08 09:58:02 INFO SparkContext: SparkContext is stopping with exitCode 
0.
   24/06/08 09:58:02 INFO SparkUI: Stopped Spark web UI at 
http://soumils-mbp:8091
   24/06/08 09:58:02 INFO MapOutputTrackerMasterEndpoint: 
MapOutputTrackerMasterEndpoint stopped!
   24/06/08 09:58:02 INFO MemoryStore: MemoryStore cleared
   24/06/08 09:58:02 INFO BlockManager: BlockManager stopped
   24/06/08 09:58:02 INFO BlockManagerMaster: BlockManagerMaster stopped
   24/06/08 09:58:02 INFO 
OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: 
OutputCommitCoordinator stopped!
   24/06/08 09:58:02 INFO SparkContext: Successfully stopped SparkContext
   Exception in thread "main" 
org.apache.hudi.utilities.ingestion.HoodieIngestionException: Ingestion service 
was shut down with exception.
        at 
org.apache.hudi.utilities.ingestion.HoodieIngestionService.startIngestion(HoodieIngestionService.java:67)
        at org.apache.hudi.common.util.Option.ifPresent(Option.java:97)
        at 
org.apache.hudi.utilities.streamer.HoodieStreamer.sync(HoodieStreamer.java:205)
        at 
org.apache.hudi.utilities.streamer.HoodieStreamer.main(HoodieStreamer.java:584)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at 
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at 
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1020)
        at 
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:192)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:215)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
        at 
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1111)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1120)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
   Caused by: java.util.concurrent.ExecutionException: 
org.apache.hudi.exception.HoodieException
        at 
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
        at 
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2005)
        at 
org.apache.hudi.async.HoodieAsyncService.waitForShutdown(HoodieAsyncService.java:103)
        at 
org.apache.hudi.utilities.ingestion.HoodieIngestionService.startIngestion(HoodieIngestionService.java:65)
        ... 15 more
   Caused by: org.apache.hudi.exception.HoodieException
        at 
org.apache.hudi.utilities.streamer.HoodieStreamer$StreamSyncService.lambda$startService$1(HoodieStreamer.java:796)
        at 
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
   Caused by: java.lang.NullPointerException
        at 
org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.lambda$fetchValidOffsets$1(KafkaOffsetGen.java:340)
        at 
java.base/java.util.stream.MatchOps$1MatchSink.accept(MatchOps.java:90)
        at 
java.base/java.util.HashMap$EntrySpliterator.tryAdvance(HashMap.java:1785)
        at 
java.base/java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:127)
        at 
java.base/java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:502)
        at 
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:488)
        at 
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
        at 
java.base/java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:230)
        at 
java.base/java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:196)
        at 
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at 
java.base/java.util.stream.ReferencePipeline.anyMatch(ReferencePipeline.java:528)
        at 
org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.fetchValidOffsets(KafkaOffsetGen.java:340)
        at 
org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.getNextOffsetRanges(KafkaOffsetGen.java:255)
        at 
org.apache.hudi.utilities.sources.KafkaSource.fetchNewData(KafkaSource.java:63)
        at org.apache.hudi.utilities.sources.Source.fetchNext(Source.java:76)
        at 
org.apache.hudi.utilities.streamer.SourceFormatAdapter.fetchNewDataInRowFormat(SourceFormatAdapter.java:228)
        at 
org.apache.hudi.utilities.streamer.StreamSync.fetchFromSource(StreamSync.java:527)
        at 
org.apache.hudi.utilities.streamer.StreamSync.readFromSource(StreamSync.java:495)
        at 
org.apache.hudi.utilities.streamer.StreamSync.syncOnce(StreamSync.java:405)
        at 
org.apache.hudi.utilities.streamer.HoodieStreamer$StreamSyncService.lambda$startService$1(HoodieStreamer.java:757)
        ... 4 more
   24/06/08 09:58:02 INFO ShutdownHookManager: Shutdown hook called
   24/06/08 09:58:02 INFO ShutdownHookManager: Deleting directory 
/private/var/folders/qq/s_1bjv516pn_mck29cwdwxnm0000gp/T/spark-c4a23229-86f5-4787-94bc-4c32ab8066c4
   24/06/08 09:58:02 INFO ShutdownHookManager: Deleting directory 
/private/var/folders/qq/s_1bjv516pn_mck29cwdwxnm0000gp/T/spark-0c931763-81aa-4831-ab7e-9b1b7e2a3103
   (base) soumilshah@Soumils-MBP deltastreamerBroadcastJoins % 
   
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to