[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #6046: [HUDI-4363] Support Clustering row writer to improve performance

2022-09-21 Thread GitBox


alexeykudinkin commented on code in PR #6046:
URL: https://github.com/apache/hudi/pull/6046#discussion_r977019700


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java:
##
@@ -275,6 +345,66 @@ private HoodieData> 
readRecordsForGroupBaseFiles(JavaSparkContex
 .map(record -> transform(record, writeConfig)));
   }
 
+  /**
+   * Get dataset of all records for the group. This includes all records from 
file slice (Apply updates from log files, if any).
+   */
+  private Dataset readRecordsForGroupAsRow(JavaSparkContext jsc,
+HoodieClusteringGroup 
clusteringGroup,
+String instantTime) {
+List clusteringOps = 
clusteringGroup.getSlices().stream()
+.map(ClusteringOperation::create).collect(Collectors.toList());
+boolean hasLogFiles = clusteringOps.stream().anyMatch(op -> 
op.getDeltaFilePaths().size() > 0);
+SQLContext sqlContext = new SQLContext(jsc.sc());
+
+Path[] baseFilePaths = clusteringOps
+.stream()
+.map(op -> {
+  ArrayList readPaths = new ArrayList<>();
+  if (op.getBootstrapFilePath() != null) {
+readPaths.add(op.getBootstrapFilePath());
+  }
+  if (op.getDataFilePath() != null) {
+readPaths.add(op.getDataFilePath());
+  }
+  return readPaths;
+})
+.flatMap(Collection::stream)
+.filter(path -> !path.isEmpty())
+.map(Path::new)
+.toArray(Path[]::new);
+
+HashMap params = new HashMap<>();
+params.put("hoodie.datasource.query.type", "snapshot");
+params.put("as.of.instant", instantTime);
+
+Path[] paths;
+if (hasLogFiles) {
+  String compactionFractor = 
Option.ofNullable(getWriteConfig().getString("compaction.memory.fraction"))
+  .orElse("0.75");
+  params.put("compaction.memory.fraction", compactionFractor);
+
+  Path[] deltaPaths = clusteringOps
+  .stream()
+  .filter(op -> !op.getDeltaFilePaths().isEmpty())
+  .flatMap(op -> op.getDeltaFilePaths().stream())
+  .map(Path::new)
+  .toArray(Path[]::new);
+  paths = CollectionUtils.combine(baseFilePaths, deltaPaths);
+} else {
+  paths = baseFilePaths;
+}
+
+String readPathString = String.join(",", 
Arrays.stream(paths).map(Path::toString).toArray(String[]::new));
+params.put("hoodie.datasource.read.paths", readPathString);
+// Building HoodieFileIndex needs this param to decide query path
+params.put("glob.paths", readPathString);
+
+// Let Hudi relations to fetch the schema from the table itself
+BaseRelation relation = SparkAdapterSupport$.MODULE$.sparkAdapter()

Review Comment:
   :+1:



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #6046: [HUDI-4363] Support Clustering row writer to improve performance

2022-09-19 Thread GitBox


alexeykudinkin commented on code in PR #6046:
URL: https://github.com/apache/hudi/pull/6046#discussion_r974679847


##
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala:
##
@@ -105,6 +108,52 @@ object HoodieDatasetBulkInsertHelper extends Logging {
 partitioner.repartitionRecords(trimmedDF, 
config.getBulkInsertShuffleParallelism)
   }
 
+  /**
+   * Perform bulk insert for [[Dataset]], will not change timeline/index, 
return
+   * information about write files.
+   */
+  def bulkInsert(dataset: Dataset[Row],
+ instantTime: String,
+ table: HoodieTable[_ <: HoodieRecordPayload[_ <: 
HoodieRecordPayload[_ <: AnyRef]], _, _, _],
+ writeConfig: HoodieWriteConfig,
+ partitioner: BulkInsertPartitioner[Dataset[Row]],
+ parallelism: Int,
+ shouldPreserveHoodieMetadata: Boolean): 
HoodieData[WriteStatus] = {
+val repartitionedDataset = partitioner.repartitionRecords(dataset, 
parallelism)
+val arePartitionRecordsSorted = partitioner.arePartitionRecordsSorted
+val schema = dataset.schema
+val writeStatuses = 
repartitionedDataset.queryExecution.toRdd.mapPartitions(iter => {
+  val taskContextSupplier: TaskContextSupplier = 
table.getTaskContextSupplier
+  val taskPartitionId = taskContextSupplier.getPartitionIdSupplier.get
+  val taskId = taskContextSupplier.getStageIdSupplier.get.toLong
+  val taskEpochId = taskContextSupplier.getAttemptIdSupplier.get
+  val writer = new BulkInsertDataInternalWriterHelper(
+table,
+writeConfig,
+instantTime,
+taskPartitionId,
+taskId,
+taskEpochId,
+schema,
+writeConfig.populateMetaFields,
+arePartitionRecordsSorted,
+shouldPreserveHoodieMetadata)
+
+  try {
+iter.foreach(writer.write)
+  } catch {
+case t: Throwable =>
+  writer.abort()
+  throw t
+  } finally {
+writer.close()
+  }
+
+  writer.getWriteStatuses.asScala.map(_.toWriteStatus).iterator
+}).collect()
+table.getContext.parallelize(writeStatuses.toList.asJava)

Review Comment:
   nit: no need for `toList`



##
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkAdapterSupport.scala:
##
@@ -26,17 +26,6 @@ import org.apache.spark.sql.hudi.SparkAdapter
  */
 trait SparkAdapterSupport {
 
-  lazy val sparkAdapter: SparkAdapter = {

Review Comment:
   Instead of moving this to Java let's dot he following:
   
   - Create companion object `ScalaAdapterSupport` 
   - Move this conditional there
   - Keep this var (for compatibility) referencing static one from the object



##
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java:
##
@@ -183,8 +183,17 @@ public boolean accept(Path path) {
 metaClientCache.put(baseDir.toString(), metaClient);
   }
 
-  fsView = 
FileSystemViewManager.createInMemoryFileSystemView(engineContext,
-  metaClient, 
HoodieInputFormatUtils.buildMetadataConfig(getConf()));
+  if (getConf().get("as.of.instant") != null) {

Review Comment:
   Good catch!



##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java:
##
@@ -91,27 +81,4 @@ public JavaRDD> 
repartitionRecords(JavaRDD> reco
   return hoodieRecord;
 });
   }
-
-  private Dataset reorder(Dataset dataset, int numOutputGroups) {

Review Comment:
   Thanks for cleaning that up!



##
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieSparkMergeOnReadTableClustering.java:
##
@@ -60,20 +61,30 @@ class TestHoodieSparkMergeOnReadTableClustering extends 
SparkClientFunctionalTes
 
   private static Stream testClustering() {
 return Stream.of(
-Arguments.of(true, true, true),
-Arguments.of(true, true, false),
-Arguments.of(true, false, true),
-Arguments.of(true, false, false),
-Arguments.of(false, true, true),
-Arguments.of(false, true, false),
-Arguments.of(false, false, true),
-Arguments.of(false, false, false)
-);
+Arrays.asList(true, true, true),
+Arrays.asList(true, true, false),
+Arrays.asList(true, false, true),
+Arrays.asList(true, false, false),
+Arrays.asList(false, true, true),
+Arrays.asList(false, true, false),
+Arrays.asList(false, false, true),
+Arrays.asList(false, false, false))
+.flatMap(arguments -> {
+  ArrayList enableRowClusteringArgs = new ArrayList<>();
+  enableRowClusteringArgs.add(true);
+  enableRowClusteringArgs.addAll(arguments);
+  ArrayList disableRowClusteringArgs = new ArrayList<>();
+  disableRowClusteringArgs.add(false);

Review Comment

[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #6046: [HUDI-4363] Support Clustering row writer to improve performance

2022-09-14 Thread GitBox


alexeykudinkin commented on code in PR #6046:
URL: https://github.com/apache/hudi/pull/6046#discussion_r971302485


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java:
##
@@ -273,6 +330,60 @@ private HoodieData> 
readRecordsForGroupBaseFiles(JavaSparkContex
 .map(record -> transform(record, writeConfig)));
   }
 
+  /**
+   * Get dataset of all records for the group. This includes all records from 
file slice (Apply updates from log files, if any).
+   */
+  private Dataset readRecordsForGroupAsRow(JavaSparkContext jsc,
+   HoodieClusteringGroup 
clusteringGroup,
+   String instantTime) {
+List clusteringOps = 
clusteringGroup.getSlices().stream()
+.map(ClusteringOperation::create).collect(Collectors.toList());
+boolean hasLogFiles = clusteringOps.stream().anyMatch(op -> 
op.getDeltaFilePaths().size() > 0);
+SQLContext sqlContext = new SQLContext(jsc.sc());
+
+String[] baseFilePaths = clusteringOps
+.stream()
+.map(op -> {
+  ArrayList readPaths = new ArrayList<>();
+  if (op.getBootstrapFilePath() != null) {
+readPaths.add(op.getBootstrapFilePath());
+  }
+  if (op.getDataFilePath() != null) {
+readPaths.add(op.getDataFilePath());
+  }
+  return readPaths;
+})
+.flatMap(Collection::stream)
+.filter(path -> !path.isEmpty())
+.toArray(String[]::new);
+String[] deltaPaths = clusteringOps
+.stream()
+.filter(op -> !op.getDeltaFilePaths().isEmpty())
+.flatMap(op -> op.getDeltaFilePaths().stream())
+.toArray(String[]::new);
+
+Dataset inputRecords;
+if (hasLogFiles) {
+  String compactionFractor = 
Option.ofNullable(getWriteConfig().getString("compaction.memory.fraction"))
+  .orElse("0.75");
+  String[] paths = CollectionUtils.combine(baseFilePaths, deltaPaths);
+  inputRecords = sqlContext.read()

Review Comment:
   Good call. We shouldn't be moving any of these classes, we can use 
`SparkAdapter` to provide us w/ an interface to access it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #6046: [HUDI-4363] Support Clustering row writer to improve performance

2022-09-14 Thread GitBox


alexeykudinkin commented on code in PR #6046:
URL: https://github.com/apache/hudi/pull/6046#discussion_r971287163


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java:
##
@@ -52,6 +55,27 @@ public SparkSortAndSizeExecutionStrategy(HoodieTable table,
 super(table, engineContext, writeConfig);
   }
 
+  @Override
+  public HoodieData performClusteringWithRecordsRow(Dataset 
inputRecords, int numOutputGroups,
+ String 
instantTime, Map strategyParams, Schema schema,
+ 
List fileGroupIdList, boolean preserveHoodieMetadata) {
+LOG.info("Starting clustering for a group, parallelism:" + numOutputGroups 
+ " commit:" + instantTime);
+HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder()
+.withBulkInsertParallelism(numOutputGroups)
+.withProps(getWriteConfig().getProps()).build();
+
+boolean shouldPreserveHoodieMetadata = preserveHoodieMetadata;

Review Comment:
   That should be decided wherever we use it. But we shouldn't be overriding 
one w/ another here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #6046: [HUDI-4363] Support Clustering row writer to improve performance

2022-09-14 Thread GitBox


alexeykudinkin commented on code in PR #6046:
URL: https://github.com/apache/hudi/pull/6046#discussion_r971286665


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java:
##
@@ -148,29 +184,34 @@ protected BulkInsertPartitioner>> 
getPartitioner(Map> 
runClusteringForGroupAsync(HoodieClusteringGroup clusteringGroup, Map strategyParams,
- 
boolean preserveHoodieMetadata, String instantTime) {
+  private CompletableFuture> 
runClusteringForGroupAsyncWithRDD(HoodieClusteringGroup clusteringGroup, 
Map strategyParams,

Review Comment:
   RDD suffix is misleading though (Row based ones are also relying on RDD 
internally)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #6046: [HUDI-4363] Support Clustering row writer to improve performance

2022-09-12 Thread GitBox


alexeykudinkin commented on code in PR #6046:
URL: https://github.com/apache/hudi/pull/6046#discussion_r969043938


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java:
##
@@ -113,6 +129,15 @@ public HoodieWriteMetadata> 
performClustering(final Hood
 return writeMetadata;
   }
 
+  /**
+   * Execute clustering to write inputRecords into new files based on 
strategyParams.
+   * Different from {@link performClusteringWithRecordsRDD}, this method take 
{@link Dataset}
+   * as inputs.
+   */
+  public abstract HoodieData 
performClusteringWithRecordsRow(final Dataset inputRecords, final int 
numOutputGroups, final String instantTime,

Review Comment:
   This style of wrapping (while acceptable under recognized code style-guide) 
makes it quite hard to read on laptop screen: 
   
   https://user-images.githubusercontent.com/428277/189783507-51e4138e-e9fa-48ba-8b54-a798a31c200c.png";>
   



##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RowSpatialCurveSortPartitioner.java:
##
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution.bulkinsert;
+
+import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.sort.SpaceCurveSortingHelper;
+import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class RowSpatialCurveSortPartitioner implements 
BulkInsertPartitioner> {
+
+  private final String[] orderByColumns;

Review Comment:
   Let's extract common base class `SpatialCurveSortPartitionerBase` so that we 
can reuse as much code as possible and avoid duplication



##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java:
##
@@ -98,10 +106,18 @@ public HoodieWriteMetadata> 
performClustering(final Hood
 // execute clustering for each group async and collect WriteStatus
 Stream> writeStatusesStream = FutureUtils.allOf(
 clusteringPlan.getInputGroups().stream()
-.map(inputGroup -> runClusteringForGroupAsync(inputGroup,
-clusteringPlan.getStrategy().getStrategyParams(),
-
Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(false),
-instantTime))
+.map(inputGroup -> {
+  if 
(getWriteConfig().getBooleanOrDefault("hoodie.datasource.write.row.writer.enable",
 false)) {
+return runClusteringForGroupAsyncWithRow(inputGroup,
+clusteringPlan.getStrategy().getStrategyParams(),
+
Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(false),

Review Comment:
   Please extract common expression (to `shouldPreserveMetadata`)



##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##
@@ -1084,7 +1084,7 @@ public boolean isEmbeddedTimelineServerEnabled() {
   }
 
   public boolean isEmbeddedTimelineServerReuseEnabled() {
-return 
Boolean.parseBoolean(getStringOrDefault(EMBEDDED_TIMELINE_SERVER_REUSE_ENABLED));
+return getBoolean(EMBEDDED_TIMELINE_SERVER_REUSE_ENABLED);

Review Comment:
   We need to do `getBooleanOrDefault`, otherwise it might NPE (due to unboxing)



##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertRowWriter.java:
##
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in w

[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #6046: [HUDI-4363] Support Clustering row writer to improve performance

2022-08-24 Thread GitBox


alexeykudinkin commented on code in PR #6046:
URL: https://github.com/apache/hudi/pull/6046#discussion_r951930671


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java:
##
@@ -98,10 +110,18 @@ public HoodieWriteMetadata> 
performClustering(final Hood
 // execute clustering for each group async and collect WriteStatus
 Stream> writeStatusesStream = FutureUtils.allOf(
 clusteringPlan.getInputGroups().stream()
-.map(inputGroup -> runClusteringForGroupAsync(inputGroup,
-clusteringPlan.getStrategy().getStrategyParams(),
-
Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(false),
-instantTime))
+.map(inputGroup -> {
+  if 
(Boolean.parseBoolean(getWriteConfig().getString(HoodieClusteringConfig.CLUSTERING_AS_ROW)))
 {

Review Comment:
   Let's abstract this as a method in `WriteConfig`



##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RowSpatialCurveSortPartitioner.java:
##
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution.bulkinsert;
+
+import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.sort.SpaceCurveSortingHelper;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class RowSpatialCurveSortPartitioner extends 
RowCustomColumnsSortPartitioner {

Review Comment:
   Why do we inherit from `RowCustomColumnsSortPartitioner`



##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java:
##
@@ -273,6 +398,62 @@ private HoodieData> 
readRecordsForGroupBaseFiles(JavaSparkContex
 .map(record -> transform(record, writeConfig)));
   }
 
+  /**
+   * Get dataset of all records for the group. This includes all records from 
file slice (Apply updates from log files, if any).
+   */
+  private Dataset readRecordsForGroupAsRow(JavaSparkContext jsc,
+   HoodieClusteringGroup 
clusteringGroup,
+   String instantTime) {
+List clusteringOps = 
clusteringGroup.getSlices().stream()
+.map(ClusteringOperation::create).collect(Collectors.toList());
+boolean hasLogFiles = clusteringOps.stream().anyMatch(op -> 
op.getDeltaFilePaths().size() > 0);
+SQLContext sqlContext = new SQLContext(jsc.sc());
+
+String[] baseFilePaths = clusteringOps
+.stream()
+.map(op -> {
+  ArrayList pairs = new ArrayList<>();
+  if (op.getBootstrapFilePath() != null) {
+pairs.add(op.getBootstrapFilePath());
+  }
+  if (op.getDataFilePath() != null) {
+pairs.add(op.getDataFilePath());
+  }
+  return pairs;
+})
+.flatMap(Collection::stream)
+.filter(path -> !path.isEmpty())
+.toArray(String[]::new);
+String[] deltaPaths = clusteringOps
+.stream()
+.filter(op -> !op.getDeltaFilePaths().isEmpty())
+.flatMap(op -> op.getDeltaFilePaths().stream())
+.toArray(String[]::new);
+
+Dataset inputRecords;
+if (hasLogFiles) {
+  String compactionFractor = 
Option.ofNullable(getWriteConfig().getString("compaction.memory.fraction"))
+  .orElse("0.75");
+  String[] paths = new String[baseFilePaths.length + deltaPaths.length];
+  System.arraycopy(baseFilePaths, 0, paths, 0, baseFilePaths.length);
+  System.arraycopy(deltaPaths, 0, paths, baseFilePaths.length, 
deltaPaths.length);

Review Comment:
   You can use `CollectionUtils.combine`



##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieInternalWriteStatusCoordinator.java:
##
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional inf