soumilshah1995 opened a new issue, #11417: URL: https://github.com/apache/hudi/issues/11417
When running the Hoodie DeltaStreamer with two writers simultaneously, one for the US partition and the other for the IN partition, one of the writers fails with a NullPointerException. This issue occurs during the offset fetching process from Kafka. ![image](https://github.com/apache/hudi/assets/39345855/9997c228-ff87-4650-9e9b-55e8bf215ce0) # Steps ### spin up stack ``` version: "3" services: trino-coordinator: image: 'trinodb/trino:400' hostname: trino-coordinator ports: - '8080:8080' volumes: - ./trino/etc:/etc/trino metastore_db: image: postgres:11 hostname: metastore_db ports: - 5432:5432 environment: POSTGRES_USER: hive POSTGRES_PASSWORD: hive POSTGRES_DB: metastore hive-metastore: hostname: hive-metastore image: 'starburstdata/hive:3.1.2-e.18' ports: - '9083:9083' # Metastore Thrift environment: HIVE_METASTORE_DRIVER: org.postgresql.Driver HIVE_METASTORE_JDBC_URL: jdbc:postgresql://metastore_db:5432/metastore HIVE_METASTORE_USER: hive HIVE_METASTORE_PASSWORD: hive HIVE_METASTORE_WAREHOUSE_DIR: s3://datalake/ S3_ENDPOINT: http://minio:9000 S3_ACCESS_KEY: admin S3_SECRET_KEY: password S3_PATH_STYLE_ACCESS: "true" REGION: "" GOOGLE_CLOUD_KEY_FILE_PATH: "" AZURE_ADL_CLIENT_ID: "" AZURE_ADL_CREDENTIAL: "" AZURE_ADL_REFRESH_URL: "" AZURE_ABFS_STORAGE_ACCOUNT: "" AZURE_ABFS_ACCESS_KEY: "" AZURE_WASB_STORAGE_ACCOUNT: "" AZURE_ABFS_OAUTH: "" AZURE_ABFS_OAUTH_TOKEN_PROVIDER: "" AZURE_ABFS_OAUTH_CLIENT_ID: "" AZURE_ABFS_OAUTH_SECRET: "" AZURE_ABFS_OAUTH_ENDPOINT: "" AZURE_WASB_ACCESS_KEY: "" HIVE_METASTORE_USERS_IN_ADMIN_ROLE: "admin" depends_on: - metastore_db healthcheck: test: bash -c "exec 6<> /dev/tcp/localhost/9083" fast-data-dev: image: dougdonohoe/fast-data-dev ports: - "3181:3181" - "3040:3040" - "7081:7081" - "7082:7082" - "7083:7083" - "7092:7092" - "8081:8081" environment: - ZK_PORT=3181 - WEB_PORT=3040 - REGISTRY_PORT=8081 - REST_PORT=7082 - CONNECT_PORT=7083 - BROKER_PORT=7092 - ADV_HOST=127.0.0.1 volumes: hive-metastore-postgresql: networks: default: name: hudi ``` # publish some data ``` from faker import Faker from time import sleep import random import uuid from datetime import datetime from kafka_schema_registry import prepare_producer # Configuration KAFKA_BOOTSTRAP_SERVERS = ['localhost:7092'] SCHEMA_REGISTRY_URL = 'http://localhost:8081' NUM_MESSAGES = 20 SLEEP_INTERVAL = 1 TOPIC_NAME = 'orders' NUM_PARTITIONS = 1 REPLICATION_FACTOR = 1 # Avro Schema SAMPLE_SCHEMA = { "type": "record", "name": "Order", "fields": [ {"name": "order_id", "type": "string"}, {"name": "name", "type": "string"}, {"name": "order_value", "type": "string"}, {"name": "priority", "type": "string"}, {"name": "order_date", "type": "string"}, {"name": "customer_id", "type": "string"}, {"name": "ts", "type": "string"}, {"name": "country", "type": "string"} ] } # Kafka Producer producer = prepare_producer( KAFKA_BOOTSTRAP_SERVERS, SCHEMA_REGISTRY_URL, TOPIC_NAME, NUM_PARTITIONS, REPLICATION_FACTOR, value_schema=SAMPLE_SCHEMA ) # Faker instance faker = Faker() class DataGenerator: @staticmethod def get_orders_data(): """ Generate and return a dictionary with mock order data. """ country = random.choice(['US', 'IN']) # Define country variable return { "order_id": str(uuid.uuid4()), "name": faker.text(max_nb_chars=20), "order_value": str(random.randint(10, 1000)), "priority": random.choice(["LOW", "MEDIUM", "HIGH"]), "order_date": faker.date_between(start_date='-30d', end_date='today').strftime('%Y-%m-%d'), "customer_id": str(uuid.uuid4()), "ts": str(datetime.now().timestamp()), "country": country } @staticmethod def produce_avro_message(producer, data): """ Produce an Avro message and send it to the appropriate Kafka topic based on the country. """ topic = 'orders_in' if data['country'] == 'IN' else 'orders_us' producer.send(topic, data) # Generate and send order data for _ in range(NUM_MESSAGES): order_data = DataGenerator.get_orders_data() print(order_data, type(order_data)) DataGenerator.produce_avro_message(producer, order_data) print("Order Payload:", order_data) sleep(SLEEP_INTERVAL) ``` # Job 1 ``` spark-submit \ --class org.apache.hudi.utilities.streamer.HoodieStreamer \ --packages 'org.apache.hudi:hudi-spark3.4-bundle_2.12:0.14.0,org.apache.hadoop:hadoop-aws:3.3.2' \ --properties-file spark-config.properties \ --master 'local[*]' \ --executor-memory 1g \ /Users/soumilshah/IdeaProjects/SparkProject/apache-hudi-delta-streamer-labs/E11/jar/hudi-utilities-slim-bundle_2.12-0.14.0.jar \ --table-type COPY_ON_WRITE \ --op UPSERT \ --min-sync-interval-seconds 60 \ --continuous \ --source-ordering-field ts \ --source-class org.apache.hudi.utilities.sources.AvroKafkaSource \ --target-base-path 'file:////Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders' \ --target-table orders \ --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \ --transformer-class 'org.apache.hudi.utilities.transform.SqlFileBasedTransformer' \ --props tbl_hudi_us.props ``` tbl_hudi_us.props ``` hoodie.datasource.write.recordkey.field=order_id hoodie.datasource.write.partitionpath.field=country hoodie.datasource.write.precombine.field=ts # Kafka Prop bootstrap.servers=localhost:7092 auto.offset.reset=earliest hoodie.deltastreamer.source.kafka.topic=orders_us hoodie.deltastreamer.source.kafka.value.deserializer.class=org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer schema.registry.url=http://localhost:8081/ hoodie.deltastreamer.schemaprovider.registry.schemaconverter= hoodie.deltastreamer.schemaprovider.registry.url=http://localhost:8081/subjects/orders-value/versions/latest # Hive Sync hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor hoodie.datasource.hive_sync.metastore.uris=thrift://localhost:9083 hoodie.datasource.hive_sync.mode=hms hoodie.datasource.hive_sync.enable=true hoodie.datasource.hive_sync.database=default hoodie.datasource.hive_sync.table=orders hoodie.datasource.write.hive_style_partitioning=true # Locks Providers hoodie.write.concurrency.mode=optimistic_concurrency_control hoodie.cleaner.policy.failed.writes=LAZY hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider hoodie.streamer.transformer.sql.file=/Users/soumilshah/IdeaProjects/SparkProject/deltastreamerBroadcastJoins/us.sql ``` us.sql ``` SELECT * FROM <SRC> a ``` # job 2 ``` spark-submit \ --class org.apache.hudi.utilities.streamer.HoodieStreamer \ --packages 'org.apache.hudi:hudi-spark3.4-bundle_2.12:0.14.0,org.apache.hadoop:hadoop-aws:3.3.2' \ --properties-file spark-config.properties \ --master 'local[*]' \ --executor-memory 1g \ /Users/soumilshah/IdeaProjects/SparkProject/apache-hudi-delta-streamer-labs/E11/jar/hudi-utilities-slim-bundle_2.12-0.14.0.jar \ --table-type COPY_ON_WRITE \ --op UPSERT \ --min-sync-interval-seconds 60 \ --continuous \ --source-ordering-field ts \ --source-class org.apache.hudi.utilities.sources.AvroKafkaSource \ --target-base-path 'file:////Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders' \ --target-table orders \ --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \ --transformer-class 'org.apache.hudi.utilities.transform.SqlFileBasedTransformer' \ --props tbl_hudi_in.props ``` tbl_hudi_in.props ``` hoodie.datasource.write.recordkey.field=order_id hoodie.datasource.write.partitionpath.field=country hoodie.datasource.write.precombine.field=ts # Kafka Prop bootstrap.servers=localhost:7092 auto.offset.reset=earliest hoodie.deltastreamer.source.kafka.topic=orders_in hoodie.deltastreamer.source.kafka.value.deserializer.class=org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer schema.registry.url=http://localhost:8081/ hoodie.deltastreamer.schemaprovider.registry.schemaconverter= hoodie.deltastreamer.schemaprovider.registry.url=http://localhost:8081/subjects/orders-value/versions/latest # Hive Sync hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor hoodie.datasource.hive_sync.metastore.uris=thrift://localhost:9083 hoodie.datasource.hive_sync.mode=hms hoodie.datasource.hive_sync.enable=true hoodie.datasource.hive_sync.database=default hoodie.datasource.hive_sync.table=orders hoodie.datasource.write.hive_style_partitioning=true # Locks Providers hoodie.write.concurrency.mode=optimistic_concurrency_control hoodie.cleaner.policy.failed.writes=LAZY hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider hoodie.streamer.transformer.sql.file=/Users/soumilshah/IdeaProjects/SparkProject/deltastreamerBroadcastJoins/in.sql ``` in.sql ``` SELECT * FROM <SRC> a ``` spark-config.properties ``` spark.serializer=org.apache.spark.serializer.KryoSerializer spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog spark.sql.hive.convertMetastoreParquet=false ``` ### one of writer fails ``` name=orders 24/06/08 09:58:02 INFO HoodieTableConfig: Loading table properties from file:/Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/.hoodie/hoodie.properties 24/06/08 09:58:02 INFO HoodieTableMetaClient: Finished Loading Table of type COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from file:////Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders 24/06/08 09:58:02 INFO HoodieActiveTimeline: Loaded instants upto : Option{val=[20240608095731003__commit__COMPLETED__20240608095736885]} 24/06/08 09:58:02 INFO StreamSync: Checkpoint to resume from : Option{val=orders_in,0:8} 24/06/08 09:58:02 INFO ConsumerConfig: ConsumerConfig values: allow.auto.create.topics = true auto.commit.interval.ms = 5000 auto.offset.reset = earliest bootstrap.servers = [localhost:7092] check.crcs = true client.dns.lookup = use_all_dns_ips client.id = consumer-null-1 client.rack = connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = true exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = null group.instance.id = null heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true internal.throw.on.fetch.stable.offset.unsupported = false isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 session.timeout.ms = 10000 socket.connection.setup.timeout.max.ms = 30000 socket.connection.setup.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.3] ssl.endpoint.identification.algorithm = https ssl.engine.factory.class = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.certificate.chain = null ssl.keystore.key = null ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLSv1.3 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.certificates = null ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer 24/06/08 09:58:02 INFO KafkaAvroDeserializerConfig: KafkaAvroDeserializerConfig values: bearer.auth.token = [hidden] schema.registry.url = [http://localhost:8081/] basic.auth.user.info = [hidden] auto.register.schemas = true max.schemas.per.subject = 1000 basic.auth.credentials.source = URL schema.registry.basic.auth.user.info = [hidden] bearer.auth.credentials.source = STATIC_TOKEN specific.avro.reader = false value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy 24/06/08 09:58:02 WARN ConsumerConfig: The configuration 'hoodie.deltastreamer.source.kafka.value.deserializer.class' was supplied but isn't a known config. 24/06/08 09:58:02 WARN ConsumerConfig: The configuration 'hoodie.streamer.source.kafka.value.deserializer.schema' was supplied but isn't a known config. 24/06/08 09:58:02 INFO AppInfoParser: Kafka version: 2.8.0 24/06/08 09:58:02 INFO AppInfoParser: Kafka commitId: ebb1d6e21cc92130 24/06/08 09:58:02 INFO AppInfoParser: Kafka startTimeMs: 1717855082275 24/06/08 09:58:02 INFO Metadata: [Consumer clientId=consumer-null-1, groupId=null] Cluster ID: Vd3KoCf_Qbu1gx33_kRL3g 24/06/08 09:58:02 INFO Metrics: Metrics scheduler closed 24/06/08 09:58:02 INFO Metrics: Closing reporter org.apache.kafka.common.metrics.JmxReporter 24/06/08 09:58:02 INFO Metrics: Metrics reporters closed 24/06/08 09:58:02 INFO AppInfoParser: App info kafka.consumer for consumer-null-1 unregistered 24/06/08 09:58:02 ERROR HoodieStreamer: Shutting down delta-sync due to exception java.lang.NullPointerException at org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.lambda$fetchValidOffsets$1(KafkaOffsetGen.java:340) at java.base/java.util.stream.MatchOps$1MatchSink.accept(MatchOps.java:90) at java.base/java.util.HashMap$EntrySpliterator.tryAdvance(HashMap.java:1785) at java.base/java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:127) at java.base/java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:502) at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:488) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) at java.base/java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:230) at java.base/java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:196) at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.base/java.util.stream.ReferencePipeline.anyMatch(ReferencePipeline.java:528) at org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.fetchValidOffsets(KafkaOffsetGen.java:340) at org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.getNextOffsetRanges(KafkaOffsetGen.java:255) at org.apache.hudi.utilities.sources.KafkaSource.fetchNewData(KafkaSource.java:63) at org.apache.hudi.utilities.sources.Source.fetchNext(Source.java:76) at org.apache.hudi.utilities.streamer.SourceFormatAdapter.fetchNewDataInRowFormat(SourceFormatAdapter.java:228) at org.apache.hudi.utilities.streamer.StreamSync.fetchFromSource(StreamSync.java:527) at org.apache.hudi.utilities.streamer.StreamSync.readFromSource(StreamSync.java:495) at org.apache.hudi.utilities.streamer.StreamSync.syncOnce(StreamSync.java:405) at org.apache.hudi.utilities.streamer.HoodieStreamer$StreamSyncService.lambda$startService$1(HoodieStreamer.java:757) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) 24/06/08 09:58:02 INFO HoodieStreamer: Delta Sync shutdown. Error ?true 24/06/08 09:58:02 INFO HoodieStreamer: Ingestion completed. Has error: true 24/06/08 09:58:02 INFO StreamSync: Shutting down embedded timeline server 24/06/08 09:58:02 ERROR HoodieAsyncService: Service shutdown with error java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2005) at org.apache.hudi.async.HoodieAsyncService.waitForShutdown(HoodieAsyncService.java:103) at org.apache.hudi.utilities.ingestion.HoodieIngestionService.startIngestion(HoodieIngestionService.java:65) at org.apache.hudi.common.util.Option.ifPresent(Option.java:97) at org.apache.hudi.utilities.streamer.HoodieStreamer.sync(HoodieStreamer.java:205) at org.apache.hudi.utilities.streamer.HoodieStreamer.main(HoodieStreamer.java:584) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1020) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:192) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:215) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1111) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1120) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: org.apache.hudi.exception.HoodieException at org.apache.hudi.utilities.streamer.HoodieStreamer$StreamSyncService.lambda$startService$1(HoodieStreamer.java:796) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.lang.NullPointerException at org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.lambda$fetchValidOffsets$1(KafkaOffsetGen.java:340) at java.base/java.util.stream.MatchOps$1MatchSink.accept(MatchOps.java:90) at java.base/java.util.HashMap$EntrySpliterator.tryAdvance(HashMap.java:1785) at java.base/java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:127) at java.base/java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:502) at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:488) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) at java.base/java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:230) at java.base/java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:196) at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.base/java.util.stream.ReferencePipeline.anyMatch(ReferencePipeline.java:528) at org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.fetchValidOffsets(KafkaOffsetGen.java:340) at org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.getNextOffsetRanges(KafkaOffsetGen.java:255) at org.apache.hudi.utilities.sources.KafkaSource.fetchNewData(KafkaSource.java:63) at org.apache.hudi.utilities.sources.Source.fetchNext(Source.java:76) at org.apache.hudi.utilities.streamer.SourceFormatAdapter.fetchNewDataInRowFormat(SourceFormatAdapter.java:228) at org.apache.hudi.utilities.streamer.StreamSync.fetchFromSource(StreamSync.java:527) at org.apache.hudi.utilities.streamer.StreamSync.readFromSource(StreamSync.java:495) at org.apache.hudi.utilities.streamer.StreamSync.syncOnce(StreamSync.java:405) at org.apache.hudi.utilities.streamer.HoodieStreamer$StreamSyncService.lambda$startService$1(HoodieStreamer.java:757) ... 4 more 24/06/08 09:58:02 INFO SparkContext: SparkContext is stopping with exitCode 0. 24/06/08 09:58:02 INFO SparkUI: Stopped Spark web UI at http://soumils-mbp:8091 24/06/08 09:58:02 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 24/06/08 09:58:02 INFO MemoryStore: MemoryStore cleared 24/06/08 09:58:02 INFO BlockManager: BlockManager stopped 24/06/08 09:58:02 INFO BlockManagerMaster: BlockManagerMaster stopped 24/06/08 09:58:02 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 24/06/08 09:58:02 INFO SparkContext: Successfully stopped SparkContext Exception in thread "main" org.apache.hudi.utilities.ingestion.HoodieIngestionException: Ingestion service was shut down with exception. at org.apache.hudi.utilities.ingestion.HoodieIngestionService.startIngestion(HoodieIngestionService.java:67) at org.apache.hudi.common.util.Option.ifPresent(Option.java:97) at org.apache.hudi.utilities.streamer.HoodieStreamer.sync(HoodieStreamer.java:205) at org.apache.hudi.utilities.streamer.HoodieStreamer.main(HoodieStreamer.java:584) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1020) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:192) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:215) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1111) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1120) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2005) at org.apache.hudi.async.HoodieAsyncService.waitForShutdown(HoodieAsyncService.java:103) at org.apache.hudi.utilities.ingestion.HoodieIngestionService.startIngestion(HoodieIngestionService.java:65) ... 15 more Caused by: org.apache.hudi.exception.HoodieException at org.apache.hudi.utilities.streamer.HoodieStreamer$StreamSyncService.lambda$startService$1(HoodieStreamer.java:796) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.lang.NullPointerException at org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.lambda$fetchValidOffsets$1(KafkaOffsetGen.java:340) at java.base/java.util.stream.MatchOps$1MatchSink.accept(MatchOps.java:90) at java.base/java.util.HashMap$EntrySpliterator.tryAdvance(HashMap.java:1785) at java.base/java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:127) at java.base/java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:502) at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:488) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) at java.base/java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:230) at java.base/java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:196) at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.base/java.util.stream.ReferencePipeline.anyMatch(ReferencePipeline.java:528) at org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.fetchValidOffsets(KafkaOffsetGen.java:340) at org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.getNextOffsetRanges(KafkaOffsetGen.java:255) at org.apache.hudi.utilities.sources.KafkaSource.fetchNewData(KafkaSource.java:63) at org.apache.hudi.utilities.sources.Source.fetchNext(Source.java:76) at org.apache.hudi.utilities.streamer.SourceFormatAdapter.fetchNewDataInRowFormat(SourceFormatAdapter.java:228) at org.apache.hudi.utilities.streamer.StreamSync.fetchFromSource(StreamSync.java:527) at org.apache.hudi.utilities.streamer.StreamSync.readFromSource(StreamSync.java:495) at org.apache.hudi.utilities.streamer.StreamSync.syncOnce(StreamSync.java:405) at org.apache.hudi.utilities.streamer.HoodieStreamer$StreamSyncService.lambda$startService$1(HoodieStreamer.java:757) ... 4 more 24/06/08 09:58:02 INFO ShutdownHookManager: Shutdown hook called 24/06/08 09:58:02 INFO ShutdownHookManager: Deleting directory /private/var/folders/qq/s_1bjv516pn_mck29cwdwxnm0000gp/T/spark-c4a23229-86f5-4787-94bc-4c32ab8066c4 24/06/08 09:58:02 INFO ShutdownHookManager: Deleting directory /private/var/folders/qq/s_1bjv516pn_mck29cwdwxnm0000gp/T/spark-0c931763-81aa-4831-ab7e-9b1b7e2a3103 (base) soumilshah@Soumils-MBP deltastreamerBroadcastJoins % ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org