[GitHub] [hudi] vinothchandar commented on a change in pull request #2374: [HUDI-845] Added locking capability to allow multiple writers

2021-03-05 Thread GitBox


vinothchandar commented on a change in pull request #2374:
URL: https://github.com/apache/hudi/pull/2374#discussion_r588775163



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/lock/LockProvider.java
##
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.lock;
+
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieLockException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+
+/**
+ * Pluggable lock implementations using this provider class.
+ */
+public abstract class LockProvider implements Lock, AutoCloseable {
+
+  private static final Logger LOG = LogManager.getLogger(LockProvider.class);
+
+  protected LockConfiguration lockConfiguration;
+
+  @Override
+  public final void lockInterruptibly() {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public final void lock() {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public final boolean tryLock() {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean tryLock(long time, TimeUnit unit) {
+try {
+  return tryLock(time, unit);
+} catch (Exception e) {
+  throw new HoodieLockException(e);
+}
+  }
+
+  public T getLock() {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public final Condition newCondition() {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void close() {
+try {
+  close();

Review comment:
   is this a recursive call? won't it recurse infinitely?

##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/lock/LockProvider.java
##
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.lock;
+
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieLockException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+
+/**
+ * Pluggable lock implementations using this provider class.
+ */
+public abstract class LockProvider implements Lock, AutoCloseable {

Review comment:
   We can always to interface and default methods, right? 

##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/lock/LockProvider.java
##
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 

[GitHub] [hudi] vinothchandar commented on a change in pull request #2374: [HUDI-845] Added locking capability to allow multiple writers

2021-03-05 Thread GitBox


vinothchandar commented on a change in pull request #2374:
URL: https://github.com/apache/hudi/pull/2374#discussion_r588774464



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/WriteConcurrencyMode.java
##
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.hudi.exception.HoodieException;
+
+import java.util.Locale;
+
+/**
+ * Different concurrency modes for write operations.
+ */
+public enum WriteConcurrencyMode {
+  // Only a single writer can perform write ops
+  SINGLE_WRITER("single_writer"),
+  // Multiple writer can perform write ops with lazy conflict resolution using 
locks
+  OPTIMISTIC_CONCURRENCY_CONTROL("optimistic_concurrency_control");
+
+  private final String value;
+
+  WriteConcurrencyMode(String value) {
+this.value = value;
+  }
+
+  /**
+   * Getter for write concurrency mode.
+   * @return
+   */
+  public String value() {
+return value;
+  }
+
+  /**
+   * Convert string value to WriteConcurrencyMode.
+   */
+  public static WriteConcurrencyMode fromValue(String value) {
+switch (value.toLowerCase(Locale.ROOT)) {
+  case "single_writer":
+return SINGLE_WRITER;
+  case "optimistic_concurrency_control":
+return OPTIMISTIC_CONCURRENCY_CONTROL;
+  default:
+throw new HoodieException("Invalid value of Type.");
+}
+  }
+
+  public boolean supportsOptimisticConcurrencyControl() {

Review comment:
   I was thinking from perspective of, you are just doing a direct 
comparison. thus hte `isXXX` naming. but I see your view. lets keep it as is 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] vinothchandar commented on a change in pull request #2374: [HUDI-845] Added locking capability to allow multiple writers

2021-03-05 Thread GitBox


vinothchandar commented on a change in pull request #2374:
URL: https://github.com/apache/hudi/pull/2374#discussion_r588771691



##
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
##
@@ -222,6 +225,17 @@ protected void commit(Option> 
extraMetadata, HoodieWriteMeta
 LOG.info("Committing metadata bootstrap !!");
   }
 
+  @Override
+  protected void syncTableMetadata() {

Review comment:
   can we please file a ticket for removing this auto commit stuff. it's 
kind of messy. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] vinothchandar commented on a change in pull request #2374: [HUDI-845] Added locking capability to allow multiple writers

2021-03-05 Thread GitBox


vinothchandar commented on a change in pull request #2374:
URL: https://github.com/apache/hudi/pull/2374#discussion_r588685863



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java
##
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import 
org.apache.hudi.client.lock.SimpleConcurrentFileWritesConflictResolutionStrategy;
+import org.apache.hudi.client.lock.ConflictResolutionStrategy;
+import org.apache.hudi.client.lock.ZookeeperBasedLockProvider;
+import org.apache.hudi.common.config.DefaultHoodieConfig;
+import org.apache.hudi.common.lock.LockProvider;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Properties;
+
+import static 
org.apache.hudi.common.config.LockConfiguration.DEFAULT_ACQUIRE_LOCK_WAIT_TIMEOUT_MS;

Review comment:
   But, could you explain why even `HiveMetastoreLockProvider` that has to 
be in hudi-common? That was my main point. We will be needlessly increasing the 
weight of hudi-common, which is picked up by hudi-hadoop-mr for e.g. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] vinothchandar commented on a change in pull request #2374: [HUDI-845] Added locking capability to allow multiple writers

2021-03-05 Thread GitBox


vinothchandar commented on a change in pull request #2374:
URL: https://github.com/apache/hudi/pull/2374#discussion_r588684413



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
##
@@ -43,7 +43,8 @@
   public static final String CLEANER_POLICY_PROP = "hoodie.cleaner.policy";
   public static final String AUTO_CLEAN_PROP = "hoodie.clean.automatic";
   public static final String ASYNC_CLEAN_PROP = "hoodie.clean.async";
-
+  // Turn on inline cleaning
+  public static final String INLINE_CLEAN_PROP = "hoodie.clean.inline";

Review comment:
   sure. the idea is to not proliferate, since it then becomes one more 
config to take care of. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] vinothchandar commented on a change in pull request #2374: [HUDI-845] Added locking capability to allow multiple writers

2021-03-05 Thread GitBox


vinothchandar commented on a change in pull request #2374:
URL: https://github.com/apache/hudi/pull/2374#discussion_r588682179



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/ZookeeperBasedLockProvider.java
##
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.CuratorFrameworkState;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.curator.retry.BoundedExponentialBackoffRetry;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.lock.LockProvider;
+import org.apache.hudi.common.lock.LockState;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieLockException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_CONNECTION_TIMEOUT_MS;
+import static 
org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_SESSION_TIMEOUT_MS;
+import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP;
+import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP;
+import static 
org.apache.hudi.common.config.LockConfiguration.ZK_BASE_PATH_PROP;
+import static 
org.apache.hudi.common.config.LockConfiguration.ZK_CONNECTION_TIMEOUT_MS_PROP;
+import static 
org.apache.hudi.common.config.LockConfiguration.ZK_CONNECT_URL_PROP;
+import static org.apache.hudi.common.config.LockConfiguration.ZK_LOCK_KEY_PROP;
+import static 
org.apache.hudi.common.config.LockConfiguration.ZK_SESSION_TIMEOUT_MS_PROP;
+
+/**
+ * A zookeeper based lock. This {@link LockProvider} implementation allows to 
lock table operations
+ * using zookeeper. Users need to have a Zookeeper cluster deployed to be able 
to use this lock.
+ */
+@NotThreadSafe
+public class ZookeeperBasedLockProvider extends LockProvider {
+
+  private static final Logger LOG = 
LogManager.getLogger(ZookeeperBasedLockProvider.class);
+
+  private final CuratorFramework curatorFrameworkClient;
+  private volatile InterProcessMutex lock = null;
+
+  public ZookeeperBasedLockProvider(final LockConfiguration lockConfiguration, 
final Configuration conf) {
+checkRequiredProps(lockConfiguration);
+this.lockConfiguration = lockConfiguration;
+this.curatorFrameworkClient = CuratorFrameworkFactory.builder()
+
.connectString(lockConfiguration.getConfig().getString(ZK_CONNECT_URL_PROP))
+.retryPolicy(new 
BoundedExponentialBackoffRetry(lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP),
+5000, 
lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_NUM_RETRIES_PROP)))
+
.sessionTimeoutMs(lockConfiguration.getConfig().getInteger(ZK_SESSION_TIMEOUT_MS_PROP,
 DEFAULT_ZK_SESSION_TIMEOUT_MS))
+
.connectionTimeoutMs(lockConfiguration.getConfig().getInteger(ZK_CONNECTION_TIMEOUT_MS_PROP,
 DEFAULT_ZK_CONNECTION_TIMEOUT_MS))
+.build();
+this.curatorFrameworkClient.start();
+  }
+
+  // Only used for testing
+  public ZookeeperBasedLockProvider(
+  final LockConfiguration lockConfiguration, final CuratorFramework 
curatorFrameworkClient) {
+checkRequiredProps(lockConfiguration);
+this.lockConfiguration = lockConfiguration;
+this.curatorFrameworkClient = curatorFrameworkClient;
+synchronized (this.curatorFrameworkClient) {
+  if (this.curatorFrameworkClient.getState() != 
CuratorFrameworkState.STARTED) {
+this.curatorFrameworkClient.start();
+  }
+}
+  }
+
+  @Override
+  public boolean tryLock(long time, TimeUnit unit) {
+LOG.info(generateLogStatement(LockState.ACQUIRING, 
generateLogSuffixString()));
+try {
+  acquireLock(time, 

[GitHub] [hudi] vinothchandar commented on a change in pull request #2374: [HUDI-845] Added locking capability to allow multiple writers

2021-03-05 Thread GitBox


vinothchandar commented on a change in pull request #2374:
URL: https://github.com/apache/hudi/pull/2374#discussion_r588672526



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleConcurrentFileWritesConflictResolutionStrategy.java
##
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieWriteConflictException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.ConcurrentModificationException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Stream;
+
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
+
+/**
+ * This class is a basic implementation of a conflict resolution strategy for 
concurrent writes {@link ConflictResolutionStrategy}.
+ */
+public class SimpleConcurrentFileWritesConflictResolutionStrategy
+implements ConflictResolutionStrategy {
+
+  private static final Logger LOG = 
LogManager.getLogger(SimpleConcurrentFileWritesConflictResolutionStrategy.class);
+
+  @Override
+  public Stream getCandidateInstants(HoodieActiveTimeline 
activeTimeline, HoodieInstant currentInstant,
+ Option 
lastSuccessfulInstant) {
+
+// To find which instants are conflicting, we apply the following logic
+// 1. Get completed instants timeline only for commits that have happened 
since the last successful write.
+// 2. Get any scheduled or completed compaction or clustering operations 
that have started and/or finished
+// after the current instant. We need to check for write conflicts since 
they may have mutated the same files
+// that are being newly created by the current write.
+// NOTE that any commits from table services such as compaction, 
clustering or cleaning since the
+// overlapping of files is handled using MVCC.
+Stream completedCommitsInstantStream = activeTimeline
+.getCommitsTimeline()
+.filterCompletedInstants()
+.findInstantsAfter(lastSuccessfulInstant.isPresent() ? 
lastSuccessfulInstant.get().getTimestamp() : HoodieTimeline.INIT_INSTANT_TS)
+.getInstants();
+
+Stream compactionAndClusteringTimeline = activeTimeline
+.getTimelineOfActions(CollectionUtils.createSet(REPLACE_COMMIT_ACTION, 
COMPACTION_ACTION))
+.findInstantsAfter(currentInstant.getTimestamp())
+.getInstants();
+return Stream.concat(completedCommitsInstantStream, 
compactionAndClusteringTimeline);
+  }
+
+  @Override
+  public boolean hasConflict(HoodieCommitOperation thisOperation, 
HoodieCommitOperation otherOperation) {
+// TODO : UUID's can clash even for insert/insert, handle that case.
+Set fileIdsSetForFirstInstant = thisOperation.getMutatedFileIds();
+Set fileIdsSetForSecondInstant = 
otherOperation.getMutatedFileIds();
+Set intersection = new HashSet<>(fileIdsSetForFirstInstant);
+intersection.retainAll(fileIdsSetForSecondInstant);
+if (!intersection.isEmpty()) {
+  LOG.warn("Found conflicting writes between first operation = " + 
thisOperation
+  + ", second operation = " + otherOperation + " , intersecting file 
ids " + intersection);
+  return true;
+}
+return false;
+  }
+
+  @Override
+  public Option resolveConflict(HoodieTable table,
+  HoodieCommitOperation thisOperation, HoodieCommitOperation 
otherOperation) {
+// Since compaction is eventually written as commit, we need to ensure
+// we handle this during conflict resolution and not treat the commit 

[GitHub] [hudi] vinothchandar commented on a change in pull request #2374: [HUDI-845] Added locking capability to allow multiple writers

2021-03-05 Thread GitBox


vinothchandar commented on a change in pull request #2374:
URL: https://github.com/apache/hudi/pull/2374#discussion_r588662387



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleConcurrentFileWritesConflictResolutionStrategy.java
##
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieWriteConflictException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.ConcurrentModificationException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Stream;
+
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
+
+/**
+ * This class is a basic implementation of a conflict resolution strategy for 
concurrent writes {@link ConflictResolutionStrategy}.
+ */
+public class SimpleConcurrentFileWritesConflictResolutionStrategy
+implements ConflictResolutionStrategy {
+
+  private static final Logger LOG = 
LogManager.getLogger(SimpleConcurrentFileWritesConflictResolutionStrategy.class);
+
+  @Override
+  public Stream getCandidateInstants(HoodieActiveTimeline 
activeTimeline, HoodieInstant currentInstant,
+ Option 
lastSuccessfulInstant) {
+
+// To find which instants are conflicting, we apply the following logic
+// 1. Get completed instants timeline only for commits that have happened 
since the last successful write.
+// 2. Get any scheduled or completed compaction or clustering operations 
that have started and/or finished
+// after the current instant. We need to check for write conflicts since 
they may have mutated the same files
+// that are being newly created by the current write.
+// NOTE that any commits from table services such as compaction, 
clustering or cleaning since the
+// overlapping of files is handled using MVCC.
+Stream completedCommitsInstantStream = activeTimeline
+.getCommitsTimeline()
+.filterCompletedInstants()
+.findInstantsAfter(lastSuccessfulInstant.isPresent() ? 
lastSuccessfulInstant.get().getTimestamp() : HoodieTimeline.INIT_INSTANT_TS)
+.getInstants();
+
+Stream compactionAndClusteringTimeline = activeTimeline
+.getTimelineOfActions(CollectionUtils.createSet(REPLACE_COMMIT_ACTION, 
COMPACTION_ACTION))
+.findInstantsAfter(currentInstant.getTimestamp())
+.getInstants();
+return Stream.concat(completedCommitsInstantStream, 
compactionAndClusteringTimeline);
+  }
+
+  @Override
+  public boolean hasConflict(HoodieCommitOperation thisOperation, 
HoodieCommitOperation otherOperation) {
+// TODO : UUID's can clash even for insert/insert, handle that case.
+Set fileIdsSetForFirstInstant = thisOperation.getMutatedFileIds();
+Set fileIdsSetForSecondInstant = 
otherOperation.getMutatedFileIds();
+Set intersection = new HashSet<>(fileIdsSetForFirstInstant);
+intersection.retainAll(fileIdsSetForSecondInstant);
+if (!intersection.isEmpty()) {
+  LOG.warn("Found conflicting writes between first operation = " + 
thisOperation
+  + ", second operation = " + otherOperation + " , intersecting file 
ids " + intersection);
+  return true;
+}
+return false;
+  }
+
+  @Override
+  public Option resolveConflict(HoodieTable table,
+  HoodieCommitOperation thisOperation, HoodieCommitOperation 
otherOperation) {
+// Since compaction is eventually written as commit, we need to ensure
+// we handle this during conflict resolution and not treat the commit 

[GitHub] [hudi] vinothchandar commented on a change in pull request #2374: [HUDI-845] Added locking capability to allow multiple writers

2021-03-05 Thread GitBox


vinothchandar commented on a change in pull request #2374:
URL: https://github.com/apache/hudi/pull/2374#discussion_r588660702



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/ZookeeperBasedLockProvider.java
##
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import org.apache.curator.framework.CuratorFramework;

Review comment:
   In the bundles, we explicitly white list dependencies. So not sure how 
transitive dependencies would have been picked up.  How are you testing all 
this - not using the utilities/spark bundles? 
   
   Standard practice for dependencies that are actually used in the projects 
code, is to explicitly deal with the dependency. We should not depend on 
hbase-server transitiively bringing it in. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] vinothchandar commented on a change in pull request #2374: [HUDI-845] Added locking capability to allow multiple writers

2021-03-05 Thread GitBox


vinothchandar commented on a change in pull request #2374:
URL: https://github.com/apache/hudi/pull/2374#discussion_r588656248



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/SimpleConcurrentFileWritesConflictResolutionStrategy.java
##
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieWriteConflictException;
+import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.ConcurrentModificationException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Stream;
+
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
+
+/**
+ * This class is a basic implementation of a conflict resolution strategy for 
concurrent writes {@link ConflictResolutionStrategy}.
+ */
+public class SimpleConcurrentFileWritesConflictResolutionStrategy
+implements ConflictResolutionStrategy {
+
+  private static final Logger LOG = 
LogManager.getLogger(SimpleConcurrentFileWritesConflictResolutionStrategy.class);
+
+  @Override
+  public Stream getInstantsStream(HoodieActiveTimeline 
activeTimeline, HoodieInstant currentInstant,
+ Option 
lastSuccessfulInstant) {
+
+// To find which instants are conflicting, we apply the following logic
+// 1. Get completed instants timeline only for commits that have happened 
since the last successful write.
+// 2. Get any scheduled or completed compaction or clustering operations 
that have started and/or finished
+// after the current instant. We need to check for write conflicts since 
they may have mutated the same files
+// that are being newly created by the current write.
+Stream completedCommitsInstantStream = activeTimeline
+.getCommitsTimeline()
+// TODO : getWriteTimeline to ensure we include replace commits as well
+.filterCompletedInstants()
+.findInstantsAfter(lastSuccessfulInstant.isPresent() ? 
lastSuccessfulInstant.get().getTimestamp() : "0")
+.getInstants();
+
+Stream compactionAndClusteringTimeline = activeTimeline
+.getTimelineOfActions(CollectionUtils.createSet(REPLACE_COMMIT_ACTION, 
COMPACTION_ACTION))
+.findInstantsAfter(currentInstant.getTimestamp())
+.getInstants();
+return Stream.concat(completedCommitsInstantStream, 
compactionAndClusteringTimeline);
+  }
+
+  @Override
+  public boolean hasConflict(HoodieCommitOperation thisOperation, 
HoodieCommitOperation otherOperation) {
+// TODO : UUID's can clash even for insert/insert, handle that case.
+Set fileIdsSetForFirstInstant = thisOperation.getMutatedFileIds();
+Set fileIdsSetForSecondInstant = 
otherOperation.getMutatedFileIds();
+Set intersection = new HashSet<>(fileIdsSetForFirstInstant);
+intersection.retainAll(fileIdsSetForSecondInstant);
+if (!intersection.isEmpty()) {
+  LOG.error("Found conflicting writes between first operation = " + 
thisOperation

Review comment:
   `WARN` should indicate abnormal execution. So may be or may be not. I 
still think INFO is the cleanest, since this code is supposed to handle the 
conflcting case. For debugging, users can always turn it on. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please 

[GitHub] [hudi] vinothchandar commented on a change in pull request #2374: [HUDI-845] Added locking capability to allow multiple writers

2021-03-05 Thread GitBox


vinothchandar commented on a change in pull request #2374:
URL: https://github.com/apache/hudi/pull/2374#discussion_r588655275



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/LockManager.java
##
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP;
+import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.lock.LockProvider;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieLockException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * This class wraps implementations of {@link LockProvider} and provides an 
easy way to manage the lifecycle of a lock.
+ */
+public class LockManager implements Serializable {
+
+  private static final Logger LOG = LogManager.getLogger(LockManager.class);
+  private final HoodieWriteConfig writeConfig;
+  private final LockConfiguration lockConfiguration;
+  private final SerializableConfiguration hadoopConf;
+  private volatile LockProvider lockProvider;
+  // Holds the latest completed write instant to know which ones to check 
conflict against
+  private final AtomicReference> 
latestCompletedWriteInstant;
+
+  public LockManager(HoodieWriteConfig writeConfig, FileSystem fs) {
+this.latestCompletedWriteInstant = new AtomicReference<>(Option.empty());
+this.writeConfig = writeConfig;
+this.hadoopConf = new SerializableConfiguration(fs.getConf());
+this.lockConfiguration = new LockConfiguration(writeConfig.getProps());
+  }
+
+  public void lock() {
+if 
(writeConfig.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
+  LockProvider lockProvider = getLockProvider();
+  boolean acquired = false;
+  try {
+int retries = 
lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP);
+long waitTimeInMs = 
lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP);
+int retryCount = 0;
+while (retryCount <= retries) {
+  acquired = 
lockProvider.tryLock(writeConfig.getLockAcquireWaitTimeoutInMs(), 
TimeUnit.MILLISECONDS);
+  if (acquired) {
+break;
+  }
+  LOG.info("Retrying...");
+  Thread.sleep(waitTimeInMs);
+  retryCount++;
+}
+  } catch (Exception e) {

Review comment:
   I think. for the retries, we should handle interrupted exception and 
continue retrying. Thats what I was getting at. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] vinothchandar commented on a change in pull request #2374: [HUDI-845] Added locking capability to allow multiple writers

2021-03-05 Thread GitBox


vinothchandar commented on a change in pull request #2374:
URL: https://github.com/apache/hudi/pull/2374#discussion_r588654337



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/HoodieCommitOperation.java
##
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieCommonMetadata;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.CommitUtils;
+import org.apache.hudi.common.util.Option;
+import java.util.Collections;
+import java.util.Set;
+import java.util.stream.Collectors;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
+
+/**
+ * This class is used to hold all information used to identify how to resolve 
conflicts between instants.
+ * Since we interchange payload types between AVRO specific records and 
POJO's, this object serves as
+ * a common payload to manage these conversions.
+ */
+public class HoodieCommitOperation {

Review comment:
   So the thing is , this has the common metadata, overloading "commit" is 
always confusing in our code base, since `commit` often refers to a specific 
action type. Given its used specifically, in the conflict resolution scenario, 
I think somethihg like `ConcurrentOperation` captures the intent. Its just 
concurrent to the current operation, the conflict resolution will determine if 
its actually a conflict like you mentioned. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] vinothchandar commented on a change in pull request #2374: [HUDI-845] Added locking capability to allow multiple writers

2021-03-05 Thread GitBox


vinothchandar commented on a change in pull request #2374:
URL: https://github.com/apache/hudi/pull/2374#discussion_r588651406



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/ConflictResolutionStrategy.java
##
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import org.apache.hudi.ApiMaturityLevel;
+import org.apache.hudi.PublicAPIMethod;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.stream.Stream;
+
+/**
+ * Strategy interface for conflict resolution with multiple writers.
+ * Users can provide pluggable implementations for different kinds of 
strategies to resolve conflicts when multiple
+ * writers are mutating the hoodie table.
+ */
+public interface ConflictResolutionStrategy {
+
+  /**
+   * Stream of instants to check conflicts against.
+   * @return
+   */
+  Stream getInstantsStream(HoodieActiveTimeline activeTimeline, 
HoodieInstant currentInstant, Option lastSuccessfulInstant);
+
+  /**
+   * Implementations of this method will determine whether a conflict exists 
between 2 commits.
+   * @param thisOperation
+   * @param otherOperation
+   * @return
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  boolean hasConflict(HoodieCommitOperation thisOperation, 
HoodieCommitOperation otherOperation);
+
+  /**
+   * Implementations of this method will determine how to resolve a conflict 
between 2 commits.
+   * @param thisOperation
+   * @param otherOperation
+   * @return
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  Option 
resolveConflict(Option metadataWriter, 
HoodieTable table,

Review comment:
   lets remove it. we can introduce it as needed. This will simplify the 
implementation, as it stands now. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] vinothchandar commented on a change in pull request #2374: [HUDI-845] Added locking capability to allow multiple writers

2021-03-05 Thread GitBox


vinothchandar commented on a change in pull request #2374:
URL: https://github.com/apache/hudi/pull/2374#discussion_r588650200



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##
@@ -359,10 +388,21 @@ public abstract O bulkInsertPreppedRecords(I 
preppedRecords, final String instan
* Common method containing steps to be performed before write 
(upsert/insert/...
* @param instantTime
* @param writeOperationType
+   * @param metaClient
*/
-  protected void preWrite(String instantTime, WriteOperationType 
writeOperationType) {
+  protected void preWrite(String instantTime, WriteOperationType 
writeOperationType,
+  HoodieTableMetaClient metaClient) {
 setOperationType(writeOperationType);
-syncTableMetadata();
+
this.txnManager.setLastCompletedTransaction(metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()
+.lastInstant());
+LOG.info("Last Instant Cached by writer with instant " + instantTime + " 
is " + this.txnManager.getLastCompletedTransactionOwner());
+this.txnManager.setTransactionOwner(Option.of(new 
HoodieInstant(State.INFLIGHT, metaClient.getCommitActionType(), instantTime)));
+this.txnManager.beginTransaction();
+try {
+  syncTableMetadata();
+} finally {
+  this.txnManager.endTransaction();

Review comment:
   I was wondering about the following scenario. if the cleanup fails, I 
guess it throws an error in both cases also? if so, this is okay. gtg





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] vinothchandar commented on a change in pull request #2374: [HUDI-845] Added locking capability to allow multiple writers

2021-03-05 Thread GitBox


vinothchandar commented on a change in pull request #2374:
URL: https://github.com/apache/hudi/pull/2374#discussion_r588648870



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##
@@ -359,10 +388,21 @@ public abstract O bulkInsertPreppedRecords(I 
preppedRecords, final String instan
* Common method containing steps to be performed before write 
(upsert/insert/...
* @param instantTime
* @param writeOperationType
+   * @param metaClient
*/
-  protected void preWrite(String instantTime, WriteOperationType 
writeOperationType) {
+  protected void preWrite(String instantTime, WriteOperationType 
writeOperationType,
+  HoodieTableMetaClient metaClient) {
 setOperationType(writeOperationType);
-syncTableMetadata();
+
this.txnManager.setLastCompletedTransaction(metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()
+.lastInstant());
+LOG.info("Last Instant Cached by writer with instant " + instantTime + " 
is " + this.txnManager.getLastCompletedTransactionOwner());
+this.txnManager.setTransactionOwner(Option.of(new 
HoodieInstant(State.INFLIGHT, metaClient.getCommitActionType(), instantTime)));
+this.txnManager.beginTransaction();

Review comment:
   Understood. but if some calls are needed prior to this, then either the 
object constructor should take them or the method should take them as 
arguments. Or we have a transaction builder or sorts. 
   So I would say, if this is always the case, i.e the setters are needed, then 
we change the beginTransaction() signature for good, not just an overload. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] vinothchandar commented on a change in pull request #2374: [HUDI-845] Added locking capability to allow multiple writers

2021-03-05 Thread GitBox


vinothchandar commented on a change in pull request #2374:
URL: https://github.com/apache/hudi/pull/2374#discussion_r588647208



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##
@@ -30,16 +31,19 @@
 import org.apache.hudi.callback.util.HoodieCommitCallbackFactory;
 import org.apache.hudi.client.embedded.EmbeddedTimelineService;
 import org.apache.hudi.client.heartbeat.HeartbeatUtils;
+import org.apache.hudi.client.lock.TransactionManager;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.TableService;

Review comment:
   If its an enum, its okay. I thought it had code. Still not a `model` per 
se, but we can do the moving in a different PR





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] vinothchandar commented on a change in pull request #2374: [HUDI-845] Added locking capability to allow multiple writers

2021-03-04 Thread GitBox


vinothchandar commented on a change in pull request #2374:
URL: https://github.com/apache/hudi/pull/2374#discussion_r587049910



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##
@@ -30,16 +31,19 @@
 import org.apache.hudi.callback.util.HoodieCommitCallbackFactory;
 import org.apache.hudi.client.embedded.EmbeddedTimelineService;
 import org.apache.hudi.client.heartbeat.HeartbeatUtils;
+import org.apache.hudi.client.lock.TransactionManager;

Review comment:
   `lock` as a package name feels off to me. Can we have 
`org.apache.hudi.client.transaction.TransactionManager`? 
   Then `.lock` can be a sub package under i.e `.transaction.lock.` 

##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##
@@ -30,16 +31,19 @@
 import org.apache.hudi.callback.util.HoodieCommitCallbackFactory;
 import org.apache.hudi.client.embedded.EmbeddedTimelineService;
 import org.apache.hudi.client.heartbeat.HeartbeatUtils;
+import org.apache.hudi.client.lock.TransactionManager;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.TableService;

Review comment:
   So, `model` package should just contain pojos i.e data structure 
objects. Lets move `TableService` elsewhere

##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##
@@ -210,6 +231,11 @@ void emitCommitMetrics(String instantTime, 
HoodieCommitMetadata metadata, String
 }
   }
 
+  protected void preCommit(String instantTime, HoodieCommitMetadata metadata) {
+// no-op
+// TODO : Conflict resolution is not support for Flink,Java engines

Review comment:
   typo: not supported

##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##
@@ -797,7 +853,9 @@ public Boolean rollbackFailedWrites() {
* Performs a compaction operation on a table, serially before or after an 
insert/upsert action.
*/
   protected Option inlineCompact(Option> 
extraMetadata) {
-Option compactionInstantTimeOpt = 
scheduleCompaction(extraMetadata);
+String schedulingCompactionInstant = 
HoodieActiveTimeline.createNewInstantTime();

Review comment:
   rename: `compactionInstantTime` 

##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/lock/ConflictResolutionStrategy.java
##
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.lock;
+
+import org.apache.hudi.ApiMaturityLevel;
+import org.apache.hudi.PublicAPIMethod;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.stream.Stream;
+
+/**
+ * Strategy interface for conflict resolution with multiple writers.
+ * Users can provide pluggable implementations for different kinds of 
strategies to resolve conflicts when multiple
+ * writers are mutating the hoodie table.
+ */
+public interface ConflictResolutionStrategy {
+
+  /**
+   * Stream of instants to check conflicts against.
+   * @return
+   */
+  Stream getInstantsStream(HoodieActiveTimeline activeTimeline, 
HoodieInstant currentInstant, Option lastSuccessfulInstant);
+
+  /**
+   * Implementations of this method will determine whether a conflict exists 
between 2 commits.
+   * @param thisOperation
+   * @param otherOperation
+   * @return
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  boolean hasConflict(HoodieCommitOperation thisOperation, 

[GitHub] [hudi] vinothchandar commented on a change in pull request #2374: [HUDI-845] Added locking capability to allow multiple writers

2021-02-02 Thread GitBox


vinothchandar commented on a change in pull request #2374:
URL: https://github.com/apache/hudi/pull/2374#discussion_r568841211



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##
@@ -188,6 +203,8 @@ public boolean commitStats(String instantTime, 
List stats, Opti
   postCommit(table, metadata, instantTime, extraMetadata);
   emitCommitMetrics(instantTime, metadata, commitActionType);
   LOG.info("Committed " + instantTime);
+  // Reset the last completed write instant
+  latestCompletedWriteInstant.set(null);

Review comment:
   how about empty instead of null. In general, like to avoid using `null` 
for any kind of sentinel

##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/WriteConcurrencyMode.java
##
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.hudi.exception.HoodieException;
+
+import java.util.Locale;
+
+/**
+ * Different concurrency modes for write operations.
+ */
+public enum WriteConcurrencyMode {
+  NO_WRITER("no_writer"),

Review comment:
   whats `NO_WRITER` ? its kind of difficult to understand . can we remove 
this

##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/WriteConcurrencyMode.java
##
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.hudi.exception.HoodieException;
+
+import java.util.Locale;
+
+/**
+ * Different concurrency modes for write operations.
+ */
+public enum WriteConcurrencyMode {
+  NO_WRITER("no_writer"),
+  // Only a single writer can perform write ops
+  SINGLE_WRITER("single_writer"),
+  // Multiple writer can perform write ops with lazy conflict resolution using 
locks
+  
OPTIMISTIC_CONCURRENCY_CONTROL_SHARED_LOCK("optimistic_concurrency_control_shared_lock");
+
+  private final String value;
+
+  WriteConcurrencyMode(String value) {
+this.value = value;
+  }
+
+  /**
+   * Getter for write concurrency mode.
+   * @return
+   */
+  public String value() {
+return value;
+  }
+
+  /**
+   * Convert string value to WriteConcurrencyMode.
+   */
+  public static WriteConcurrencyMode fromValue(String value) {
+switch (value.toLowerCase(Locale.ROOT)) {
+  case "no_writer":
+return NO_WRITER;
+  case "single_writer":
+return SINGLE_WRITER;
+  case "optimistic_concurrency_control_shared_lock":
+return OPTIMISTIC_CONCURRENCY_CONTROL_SHARED_LOCK;
+  default:
+throw new HoodieException("Invalid value of Type.");
+}
+  }
+
+  public boolean isMultiWriter() {

Review comment:
   rename to : supportsOptimisticConcurrencyControl() to match mode name. 

##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##
@@ -599,6 +637,7 @@ public HoodieRestoreMetadata restoreToInstant(final String 
instantTime) throws H
   public HoodieCleanMetadata clean(String cleanInstantTime) throws 
HoodieIOException {
 LOG.info("Cleaner started");
 final Timer.Context timerContext = metrics.getCleanCtx();
+scheduleCleaningAtInstant(cleanInstantTime, Option.empty());

Review comment:
   why was this change needed

##
File path: 

[GitHub] [hudi] vinothchandar commented on a change in pull request #2374: [HUDI-845] Added locking capability to allow multiple writers

2021-02-02 Thread GitBox


vinothchandar commented on a change in pull request #2374:
URL: https://github.com/apache/hudi/pull/2374#discussion_r568841211



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##
@@ -188,6 +203,8 @@ public boolean commitStats(String instantTime, 
List stats, Opti
   postCommit(table, metadata, instantTime, extraMetadata);
   emitCommitMetrics(instantTime, metadata, commitActionType);
   LOG.info("Committed " + instantTime);
+  // Reset the last completed write instant
+  latestCompletedWriteInstant.set(null);

Review comment:
   how about empty instead of null. In general, like to avoid using `null` 
for any kind of sentinel

##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/WriteConcurrencyMode.java
##
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.hudi.exception.HoodieException;
+
+import java.util.Locale;
+
+/**
+ * Different concurrency modes for write operations.
+ */
+public enum WriteConcurrencyMode {
+  NO_WRITER("no_writer"),

Review comment:
   whats `NO_WRITER` ? its kind of difficult to understand . can we remove 
this

##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/WriteConcurrencyMode.java
##
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.hudi.exception.HoodieException;
+
+import java.util.Locale;
+
+/**
+ * Different concurrency modes for write operations.
+ */
+public enum WriteConcurrencyMode {
+  NO_WRITER("no_writer"),
+  // Only a single writer can perform write ops
+  SINGLE_WRITER("single_writer"),
+  // Multiple writer can perform write ops with lazy conflict resolution using 
locks
+  
OPTIMISTIC_CONCURRENCY_CONTROL_SHARED_LOCK("optimistic_concurrency_control_shared_lock");
+
+  private final String value;
+
+  WriteConcurrencyMode(String value) {
+this.value = value;
+  }
+
+  /**
+   * Getter for write concurrency mode.
+   * @return
+   */
+  public String value() {
+return value;
+  }
+
+  /**
+   * Convert string value to WriteConcurrencyMode.
+   */
+  public static WriteConcurrencyMode fromValue(String value) {
+switch (value.toLowerCase(Locale.ROOT)) {
+  case "no_writer":
+return NO_WRITER;
+  case "single_writer":
+return SINGLE_WRITER;
+  case "optimistic_concurrency_control_shared_lock":
+return OPTIMISTIC_CONCURRENCY_CONTROL_SHARED_LOCK;
+  default:
+throw new HoodieException("Invalid value of Type.");
+}
+  }
+
+  public boolean isMultiWriter() {

Review comment:
   rename to : supportsOptimisticConcurrencyControl() to match mode name. 

##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##
@@ -599,6 +637,7 @@ public HoodieRestoreMetadata restoreToInstant(final String 
instantTime) throws H
   public HoodieCleanMetadata clean(String cleanInstantTime) throws 
HoodieIOException {
 LOG.info("Cleaner started");
 final Timer.Context timerContext = metrics.getCleanCtx();
+scheduleCleaningAtInstant(cleanInstantTime, Option.empty());

Review comment:
   why was this change needed

##
File path: