eigakow opened a new issue #1375: [SUPPORT] HoodieDeltaStreamer offset not 
handled correctly
URL: https://github.com/apache/incubator-hudi/issues/1375
 
 
   **Describe the problem you faced**
   
   I am trying to implement a continuous DeltaStreamer 
(`hudi-0.5.1-incubating`) from Kafka source with 3 partitions, which has been 
running for a while, so the earliest offsets are no longer available on Kafka 
server. While connecting with a new target-base-path: 
   - the first message is processed correctly
   - the second one fails with 
`'org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of 
range with no configured reset policy for partitions'`
   Any subsequent submits give the OffsetOutOfRange exceptions, unless I will 
provide a new target-base-path.
   
   I have added some logging into KafkaOffsetGen as suggested in 
[#1335](https://github.com/apache/incubator-hudi/issues/1335) and I see that:
   - for first message there is no checkpoint present, therefore it is going 
with 'LATEST' as expected. Message is processed and commit is successful.
   - for the next message the previous checkpoint is now detected. However it 
wants to pull all the previous messages from Kafka topic due to fromOffsets 
value being empty:  `numEvents: 5000000, fromOffsets: {}, toOffsets: 
{fr-bru-0=338362, fr-bru-1=142427, fr-bru-2=142401}`
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1. Create a custom properties file.
   ```
   hoodie.datasource.write.recordkey.field=full_count
   hoodie.datasource.write.partitionpath.field=full_count
   
hoodie.deltastreamer.schemaprovider.source.schema.file=file:///home/director/me/hudi-0.5.1-incubating/schema.avro
   
hoodie.deltastreamer.schemaprovider.target.schema.file=file:///home/director/me/hudi-0.5.1-incubating/schema.avro
   source-class=FR24JsonKafkaSource
   
bootstrap.servers=streaming-kafka-broker-1:9092,streaming-kafka-broker-2:9092,streaming-kafka-broker-3:9092
   group.id=hudi_testing
   hoodie.deltastreamer.source.kafka.topic=fr-bru
   enable.auto.commit=false
   schemaprovider-class=org.apache.hudi.utilities.schema.FilebasedSchemaProvider
   auto.offset.reset=latest
   ```
   2. Launch spark-submit with HoodieDeltaStreamer
   `spark-submit --master local --class 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer --jars 
$(pwd)/../my-app-1-jar-with-dependencies.jar 
$(pwd)/../hudi-0.5.1-incubating_latest/incubator-hudi/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.11-0.5.1-incubating.jar
 --props file:///$(pwd)/hudi-fr24.properties --target-base-path 
file:///tmp/test-hudi_new --table-type MERGE_ON_READ --target-table 
test_hudi_new --source-class FR24JsonKafkaSource --schemaprovider-class 
org.apache.hudi.utilities.schema.FilebasedSchemaProvider --continuous
   `
   
   **Expected behavior**
   
   The jobs handles the offsets correctly and continues to read kafka messages
   
   **Environment Description**
   
   * Hudi version : hudi-0.5.1-incubating
   
   * Spark version : 2.4.0-cdh6.1.0
   
   * Hive version : 2.1.1-cdh6.1.0
   
   * Hadoop version : 3.0.0-cdh6.1.0
   
   * Storage (HDFS/S3/GCS..) : tried both hdfs and local
   
   * Running on Docker? (yes/no) : no
   
   
   **Additional context**
   
    I am using a custom class for source. 
   
   **Stacktrace**
   
   ```20/03/04 10:21:02 INFO util.FSUtils: Hadoop Configuration: fs.defaultFS: 
[hdfs://nameservice1], Config:[Configuration: core-default.xml, ore-site.xml, 
mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, 
hdfs-default.xml, hdfs-site.xml, __spark_hadoop_conf_.xml], FileSystem: 
[DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_2059723432_1, 
ugi=iga@BIGDATAPOC.LOCAL (auth:KERBEROS)]]]
   20/03/04 10:21:02 INFO table.HoodieTableConfig: Loading table properties 
from /tmp/test-hudi_new/.hoodie/hoodie.properties
   20/03/04 10:21:02 INFO table.HoodieTableMetaClient: Finished Loading Table 
of type COPY_ON_WRITE(version=1) from /tmp/test-hudi_new
   20/03/04 10:21:02 INFO table.HoodieTableMetaClient: Loading Active commit 
timeline for /tmp/test-hudi_new
   20/03/04 10:21:02 INFO timeline.HoodieActiveTimeline: Loaded instants 
[[20200304102036__commit__COMPLETED]]
   20/03/04 10:21:02 INFO table.HoodieCopyOnWriteTable: Nothing to clean here. 
It is already clean
   20/03/04 10:21:02 INFO hudi.AbstractHoodieWriteClient: Committed 
20200304102036
   20/03/04 10:21:02 INFO deltastreamer.DeltaSync: Commit 20200304102036 
successful!
   20/03/04 10:21:02 INFO rdd.MapPartitionsRDD: Removing RDD 31 from 
persistence list
   20/03/04 10:21:02 INFO storage.BlockManager: Removing RDD 31
   20/03/04 10:21:02 INFO rdd.MapPartitionsRDD: Removing RDD 39 from 
persistence list
   20/03/04 10:21:02 INFO storage.BlockManager: Removing RDD 39
   20/03/04 10:21:02 INFO table.HoodieTableMetaClient: Loading 
HoodieTableMetaClient from /tmp/test-hudi_new
   20/03/04 10:21:02 INFO util.FSUtils: Hadoop Configuration: fs.defaultFS: 
[hdfs://nameservice1], Config:[Configuration: core-default.xml, ore-site.xml, 
mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, 
hdfs-default.xml, hdfs-site.xml], FileSystem: 
[DFS[DFClient[clientName=DFSClient_NONMAPREDUCE_2059723432_1, 
ugi=iga@BIGDATAPOC.LOCAL (auth:KERBEROS)]]]
   20/03/04 10:21:02 INFO table.HoodieTableConfig: Loading table properties 
from /tmp/test-hudi_new/.hoodie/hoodie.properties
   20/03/04 10:21:02 INFO table.HoodieTableMetaClient: Finished Loading Table 
of type COPY_ON_WRITE(version=1) from /tmp/test-hudi_new
   20/03/04 10:21:02 INFO timeline.HoodieActiveTimeline: Loaded instants 
[[20200304102036__commit__COMPLETED]]
   20/03/04 10:21:02 INFO deltastreamer.DeltaSync: Checkpoint to resume from : 
Option{val=}
   20/03/04 10:21:02 INFO consumer.ConsumerConfig: ConsumerConfig values:
           auto.commit.interval.ms = 5000
           auto.offset.reset = latest
           bootstrap.servers = [streaming-kafka-broker-1:9092, 
streaming-kafka-broker-2:9092, streaming-kafka-broker-3:9092]
           check.crcs = true
           client.id =
           connections.max.idle.ms = 540000
           default.api.timeout.ms = 60000
           enable.auto.commit = false
           exclude.internal.topics = true
           fetch.max.bytes = 52428800
           fetch.max.wait.ms = 500
           fetch.min.bytes = 1
           group.id = hudi_testing
           heartbeat.interval.ms = 3000
           interceptor.classes = []
           internal.leave.group.on.close = true
           isolation.level = read_uncommitted
           key.deserializer = class 
org.apache.kafka.common.serialization.StringDeserializer
           max.partition.fetch.bytes = 1048576
           max.poll.interval.ms = 300000
           max.poll.records = 500
           metadata.max.age.ms = 300000
           metric.reporters = []
           metrics.num.samples = 2
           metrics.recording.level = INFO
           metrics.sample.window.ms = 30000
           partition.assignment.strategy = [class 
org.apache.kafka.clients.consumer.RangeAssignor]
           receive.buffer.bytes = 65536
           reconnect.backoff.max.ms = 1000
           reconnect.backoff.ms = 50
           request.timeout.ms = 30000
           retry.backoff.ms = 100
           sasl.client.callback.handler.class = null
           sasl.jaas.config = null
           sasl.kerberos.kinit.cmd = /usr/bin/kinit
           sasl.kerberos.min.time.before.relogin = 60000
           sasl.kerberos.service.name = null
           sasl.kerberos.ticket.renew.jitter = 0.05
           sasl.kerberos.ticket.renew.window.factor = 0.8
           sasl.login.callback.handler.class = null
           sasl.login.class = null
           sasl.login.refresh.buffer.seconds = 300
           sasl.login.refresh.min.period.seconds = 60
           sasl.login.refresh.window.factor = 0.8
           sasl.login.refresh.window.jitter = 0.05
           sasl.mechanism = GSSAPI
           security.protocol = PLAINTEXT
           send.buffer.bytes = 131072
           session.timeout.ms = 10000
           ssl.cipher.suites = null
           ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
           ssl.endpoint.identification.algorithm = null
           ssl.key.password = null
           ssl.keymanager.algorithm = SunX509
           ssl.keystore.location = null
           ssl.keystore.password = null
           ssl.keystore.type = JKS
           ssl.protocol = TLS
           ssl.provider = null
           ssl.secure.random.implementation = null
           ssl.trustmanager.algorithm = PKIX
           ssl.truststore.location = null
           ssl.truststore.password = null
           ssl.truststore.type = JKS
           value.deserializer = class fr24.ingest.ZlibDeserializerString
   
   20/03/04 10:21:02 WARN consumer.ConsumerConfig: The configuration 
'schemaprovider-class' was supplied but isn't a known config.
   20/03/04 10:21:02 WARN consumer.ConsumerConfig: The configuration 
'hoodie.deltastreamer.schemaprovider.source.schema.file' was supplied bt isn't 
a known config.
   20/03/04 10:21:02 WARN consumer.ConsumerConfig: The configuration 
'hoodie.deltastreamer.schemaprovider.target.schema.file' was supplied bt isn't 
a known config.
   20/03/04 10:21:02 WARN consumer.ConsumerConfig: The configuration 
'hoodie.datasource.write.partitionpath.field' was supplied but isn't a nown 
config.
   20/03/04 10:21:02 WARN consumer.ConsumerConfig: The configuration 
'hoodie.datasource.write.recordkey.field' was supplied but isn't a know config.
   20/03/04 10:21:02 WARN consumer.ConsumerConfig: The configuration 
'hoodie.deltastreamer.source.kafka.topic' was supplied but isn't a know config.
   20/03/04 10:21:02 WARN consumer.ConsumerConfig: The configuration 
'source-class' was supplied but isn't a known config.
   20/03/04 10:21:02 INFO utils.AppInfoParser: Kafka version : 2.0.0-cdh6.1.0
   20/03/04 10:21:02 INFO utils.AppInfoParser: Kafka commitId : null
   20/03/04 10:21:02 INFO clients.Metadata: Cluster ID: h7B3MAm8TLumIZQHVT802A
   20/03/04 10:21:02 INFO sources.JsonKafkaSource: About to read 619753 from 
Kafka for topic :fr-bru
   20/03/04 10:21:02 WARN kafka010.KafkaUtils: overriding enable.auto.commit to 
false for executor
   20/03/04 10:21:02 WARN kafka010.KafkaUtils: overriding auto.offset.reset to 
none for executor
   20/03/04 10:21:02 WARN kafka010.KafkaUtils: overriding executor group.id to 
spark-executor-hudi_testing
   20/03/04 10:21:02 WARN kafka010.KafkaUtils: overriding receive.buffer.bytes 
to 65536 see KAFKA-3135
   20/03/04 10:21:02 INFO spark.SparkContext: Starting job: isEmpty at 
DeltaSync.java:329
   20/03/04 10:21:02 INFO scheduler.DAGScheduler: Got job 8 (isEmpty at 
DeltaSync.java:329) with 1 output partitions
   20/03/04 10:21:02 INFO scheduler.DAGScheduler: Final stage: ResultStage 14 
(isEmpty at DeltaSync.java:329)
   20/03/04 10:21:02 INFO scheduler.DAGScheduler: Parents of final stage: List()
   20/03/04 10:21:02 INFO scheduler.DAGScheduler: Missing parents: List()
   20/03/04 10:21:02 INFO scheduler.DAGScheduler: Submitting ResultStage 14 
(MapPartitionsRDD[47] at map at SourceFormatAdapter.java:62), whch has no 
missing parents
   20/03/04 10:21:02 INFO memory.MemoryStore: Block broadcast_9 stored as 
values in memory (estimated size 5.5 KB, free 366.2 MB)
   20/03/04 10:21:02 INFO memory.MemoryStore: Block broadcast_9_piece0 stored 
as bytes in memory (estimated size 3.3 KB, free 366.2 MB)
   20/03/04 10:21:02 INFO storage.BlockManagerInfo: Added broadcast_9_piece0 in 
memory on biedva-worker-1.bigdatapoc.local:34716 (size: 3.3 B, free: 366.3 MB)
   20/03/04 10:21:02 INFO spark.SparkContext: Created broadcast 9 from 
broadcast at DAGScheduler.scala:1164
   20/03/04 10:21:02 INFO scheduler.DAGScheduler: Submitting 1 missing tasks 
from ResultStage 14 (MapPartitionsRDD[47] at map at SourceFormaAdapter.java:62) 
(first 15 tasks are for partitions Vector(0))
   20/03/04 10:21:02 INFO cluster.YarnScheduler: Adding task set 14.0 with 1 
tasks
   20/03/04 10:21:02 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 
14.0 (TID 10502, biedva-worker-6.bigdatapoc.local, executor 7 partition 0, 
PROCESS_LOCAL, 7765 bytes)
   20/03/04 10:21:02 INFO storage.BlockManagerInfo: Added broadcast_9_piece0 in 
memory on biedva-worker-6.bigdatapoc.local:34194 (size: 3.3 B, free: 366.3 MB)
   20/03/04 10:21:02 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 14.0 
(TID 10502, biedva-worker-6.bigdatapoc.local, executor 7): 
og.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of 
range with no configured reset policy for partitions: {fr-bru-0=0}
           at 
org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:1002)
           at 
org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:508)
           at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1261)
           at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1189)
           at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
           at 
org.apache.spark.streaming.kafka010.InternalKafkaConsumer.poll(KafkaDataConsumer.scala:200)
           at 
org.apache.spark.streaming.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:129)
           at 
org.apache.spark.streaming.kafka010.KafkaDataConsumer$class.get(KafkaDataConsumer.scala:36)
           at 
org.apache.spark.streaming.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.get(KafkaDataConsumer.scala:212)
           at 
org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:261)
           at 
org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:229)
           at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
           at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
           at scala.collection.Iterator$$anon$10.next(Iterator.scala:394)
           at scala.collection.Iterator$class.foreach(Iterator.scala:891)
           at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
           at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
           at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
           at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
           at 
scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
           at scala.collection.AbstractIterator.to(Iterator.scala:1334)
           at 
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
           at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1334)
           at 
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
           at scala.collection.AbstractIterator.toArray(Iterator.scala:1334)
           at 
org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1364)
           at 
org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1364)
           at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2113)
           at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2113)
           at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
           at org.apache.spark.scheduler.Task.run(Task.scala:121)
           at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
           at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
           at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
           at java.lang.Thread.run(Thread.java:748)
   ```
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to