[ 
https://issues.apache.org/jira/browse/KAFKA-12164?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17269056#comment-17269056
 ] 

Konstantine Karantasis commented on KAFKA-12164:
------------------------------------------------

Thanks for reporting [~kaushik srinivas] 

First, I need to note that this issue seems to belong in its entirety to:

[https://github.com/confluentinc/kafka-connect-hdfs/issues] 

given that the behavior is not affected by the guarantees that the Kafka 
Connect framework provides. 

Briefly, let me state the obvious, that from a worker's perspective restarts 
(graceful or not) can happen at any time. 
Additionally the connector does not provide any guarantees with respect to the 
atomicity of the nested directory structure. Things become more challenging 
given that the partitioner as you mentioned is custom. 

Now, by reading quickly the javadoc for FileSystem#mkdirs in HDFS I understand 
that a nested directory can be constructed from a given path, even if some of 
the parents exist. Which makes me think that a restart would allow the creation 
of the full path as long as the partitioner is deterministic with respect to 
the record (i.e. for a given record it always creates the same path). Upon a 
restart the export of a record will be retried, so the partitioner could 
possibly reattempt the full creation of the path. But that's an early 
assumption from my side. 

Shall we take the discussion to the appropriate forum? 
https://github.com/confluentinc/kafka-connect-hdfs/issues

> ssue when kafka connect worker pod restart, during creation of nested 
> partition directories in hdfs file system.
> ----------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-12164
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12164
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>            Reporter: kaushik srinivas
>            Priority: Critical
>
> In our production labs, an issue is observed. Below is the sequence of the 
> same.
>  # hdfs connector is added to the connect worker.
>  # hdfs connector is creating folders in hdfs /test1=1/test2=2/
> Based on the custom partitioner. Here test1 and test2 are two separate nested 
> directories derived from multiple fields in the record using a custom 
> partitioner.
>  # Now kafka connect hdfs connector uses below function calls to create the 
> directories in the hdfs file system.
> fs.mkdirs(new Path(filename));
> ref: 
> [https://github.com/confluentinc/kafka-connect-hdfs/blob/master/src/main/java/io/confluent/connect/hdfs/storage/HdfsStorage.java]
> Now the important thing to note is that if mkdirs() is a non atomic operation 
> (i.e can result in partial execution if interrupted)
> then suppose the first directory ie test1 is created and just before creation 
> of test2 in hdfs happens if there is a restart to the connect worker pod. 
> Then the hdfs file system will remain with partial folders created for 
> partitions during the restart time frames.
> So we might have conditions in hdfs as below
> /test1=0/test2=0/
> /test1=1/
> /test1=2/test2=2
> /test1=3/test2=3
> So the second partition has a missing directory in it. And if hive 
> integration is enabled, hive metastore exceptions will occur since there is a 
> partition expected from hive table is missing for few partitions in hdfs.
> *This can occur to any connector with some ongoing non atomic operation and a 
> restart is triggered to kafka connect worker pod. This will result in some 
> partially completed states in the system and may cause issues for the 
> connector to continue its operation*.
> *This is a very critical issue and needs some attention on ways for handling 
> the same.*



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to