ibuenros commented on a change in pull request #2563: [GOBBLIN-691] Make
format-specific component pluggable in compaction
URL: https://github.com/apache/incubator-gobblin/pull/2563#discussion_r261408097
##
File path:
gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionJobConfigurator.java
##
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.compaction.mapreduce;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.primitives.Ints;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.math3.primes.Primes;
+import org.apache.gobblin.compaction.dataset.DatasetHelper;
+import
org.apache.gobblin.compaction.mapreduce.avro.MRCompactorAvroKeyDedupJobRunner;
+import org.apache.gobblin.compaction.parser.CompactionPathParser;
+import org.apache.gobblin.compaction.verify.InputRecordCountHelper;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.dataset.FileSystemDataset;
+import org.apache.gobblin.hive.policy.HiveRegistrationPolicy;
+import org.apache.gobblin.util.FileListUtils;
+import org.apache.gobblin.util.HadoopUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskCompletionEvent;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+
+/**
+ * Configurator for compaction job.
+ * Different data formats should have their own impl. for this interface.
+ *
+ */
+@Slf4j
+public abstract class CompactionJobConfigurator {
+
+ public static final String COMPACTION_JOB_CONFIGURATOR_FACTORY_CLASS_KEY =
"compaction.jobConfiguratorFactory.class";
+ public static final String DEFAULT_COMPACTION_JOB_CONFIGURATOR_FACTORY_CLASS
=
+
"org.apache.gobblin.compaction.mapreduce.CompactionAvroJobConfigurator$Factory";
+
+
+ @Getter
+ @AllArgsConstructor
+ protected enum EXTENSION {
+AVRO("avro"), ORC("orc");
+
+private String extensionString;
+ }
+
+ protected final State state;
+
+ @Getter
+ protected final FileSystem fs;
+
+ // Below attributes are MR related
+ @Getter
+ protected Job configuredJob;
+ @Getter
+ protected final boolean shouldDeduplicate;
+ @Getter
+ protected Path mrOutputPath = null;
+ @Getter
+ protected boolean isJobCreated = false;
+ @Getter
+ protected Collection mapReduceInputPaths = null;
+ @Getter
+ protected long fileNameRecordCount = 0;
+
+ public interface ConfiguratorFactory {
+CompactionJobConfigurator createConfigurator(State state) throws
IOException;
+ }
+
+ public CompactionJobConfigurator(State state) throws IOException {
+this.state = state;
+this.fs = getFileSystem(state);
+this.shouldDeduplicate =
state.getPropAsBoolean(MRCompactor.COMPACTION_SHOULD_DEDUPLICATE, true);
+ }
+
+ public static CompactionJobConfigurator instantiateConfigurator(State state)
{
+String compactionConfiguratorFactoryClass =
+state.getProp(COMPACTION_JOB_CONFIGURATOR_FACTORY_CLASS_KEY,
DEFAULT_COMPACTION_JOB_CONFIGURATOR_FACTORY_CLASS);
+try {
+ return Class.forName(compactionConfiguratorFactoryClass)
+ .asSubclass(ConfiguratorFactory.class)
+ .newInstance()
+ .createConfigurator(state);
+} catch (ReflectiveOperationException | IOException e) {
+ throw new RuntimeException("Failed to instantiate a instance of job
configurator:", e);
+}
+ }
+
+ public abstract String getFileExtension();
+
+ /**
+ * Customized MR job