[
https://issues.apache.org/jira/browse/APEXMALHAR-2013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15205991#comment-15205991
]
ASF GitHub Bot commented on APEXMALHAR-2013:
--------------------------------------------
Github user DT-Priyanka commented on a diff in the pull request:
https://github.com/apache/incubator-apex-malhar/pull/216#discussion_r56949559
--- Diff: library/src/main/java/com/datatorrent/lib/io/fs/BlockWriter.java
---
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.datatorrent.lib.io.fs;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang.mutable.MutableLong;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.DefaultPartition;
+import com.datatorrent.api.Partitioner;
+import com.datatorrent.lib.counters.BasicCounters;
+import com.datatorrent.lib.io.block.AbstractBlockReader;
+import com.datatorrent.lib.io.block.BlockMetadata;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Writes a block to the appFS (HDFS on which app is running). This is
temporary
+ * write to HDFS to handle large files.
+ */
+public class BlockWriter extends
AbstractFileOutputOperator<AbstractBlockReader.ReaderRecord<Slice>>
+ implements Partitioner<BlockWriter>
+{
+ /**
+ * Directory under application directory where blocks gets stored
+ */
+ public static final String SUBDIR_BLOCKS = "blocks";
+ /**
+ * List of FileBlockMetadata received in the current window.
+ */
+ private transient List<BlockMetadata.FileBlockMetadata> blockMetadatas;
+
+ /**
+ * Input port to receive Block meta data
+ */
+ public final transient DefaultInputPort<BlockMetadata.FileBlockMetadata>
blockMetadataInput = new DefaultInputPort<BlockMetadata.FileBlockMetadata>()
+ {
+
+ @Override
+ public void process(BlockMetadata.FileBlockMetadata blockMetadata)
+ {
+ blockMetadatas.add(blockMetadata);
+ LOG.debug("received blockId {} for file {} ",
blockMetadata.getBlockId(), blockMetadata.getFilePath());
+ }
+ };
+
+ /**
+ * Output port to send Block meta data to downstream operator
+ */
+ public final transient
DefaultOutputPort<BlockMetadata.FileBlockMetadata> blockMetadataOutput = new
DefaultOutputPort<BlockMetadata.FileBlockMetadata>();
+
+ public BlockWriter()
+ {
+ super();
+ blockMetadatas = Lists.newArrayList();
+ //The base class puts a restriction that the file-path cannot be null.
With this block writer it is
+ //being initialized in setup and not through configuration. So setting
it to empty string.
+ filePath = "";
+ }
+
+ /**
+ * Also, initializes the filePath based on Application path
+ */
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ filePath = context.getValue(Context.DAGContext.APPLICATION_PATH) +
Path.SEPARATOR + SUBDIR_BLOCKS;
+ super.setup(context);
+ }
+
+ /**
+ * Finalizes files for all the blockMetaDatas received during current
window
+ */
+ @Override
+ public void endWindow()
+ {
+ super.endWindow();
+
+ streamsCache.asMap().clear();
+ endOffsets.clear();
+
+ for (BlockMetadata.FileBlockMetadata blockMetadata : blockMetadatas) {
+ try {
+ finalizeFile(Long.toString(blockMetadata.getBlockId()));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ blockMetadataOutput.emit(blockMetadata);
+ }
+ blockMetadatas.clear();
+ }
+
+ @Override
+ protected String getFileName(AbstractBlockReader.ReaderRecord<Slice>
tuple)
+ {
+ return Long.toString(tuple.getBlockId());
+ }
+
+ @Override
+ protected byte[]
getBytesForTuple(AbstractBlockReader.ReaderRecord<Slice> tuple)
+ {
+ return tuple.getRecord().buffer;
+ }
+
+ private static final Logger LOG =
LoggerFactory.getLogger(BlockWriter.class);
+
+ @Override
+ public Collection<Partition<BlockWriter>>
definePartitions(Collection<Partition<BlockWriter>> partitions,
+ PartitioningContext context)
+ {
+ if (context.getParallelPartitionCount() == 0) {
+ return partitions;
+ }
+
+ // if there is no change of count, return the same collection
+ if (context.getParallelPartitionCount() == partitions.size()) {
+ LOG.debug("no change is partition count: " + partitions.size());
+ return partitions;
+ }
+
+ List<BasicCounters<MutableLong>> deletedCounters =
Lists.newArrayList();
+
+ LOG.debug("block writer parallel partition count {}",
context.getParallelPartitionCount());
+ int morePartitionsToCreate = context.getParallelPartitionCount() -
partitions.size();
+ if (morePartitionsToCreate < 0) {
+ //Delete partitions
+ Iterator<Partition<BlockWriter>> partitionIterator =
partitions.iterator();
+
+ while (morePartitionsToCreate++ < 0) {
+ Partition<BlockWriter> toRemove = partitionIterator.next();
+
deletedCounters.add(toRemove.getPartitionedInstance().fileCounters);
+ partitionIterator.remove();
+ }
+ } else {
+ //Add more partitions
+ BlockWriter anOperator =
partitions.iterator().next().getPartitionedInstance();
+
+ while (morePartitionsToCreate-- > 0) {
+ DefaultPartition<BlockWriter> partition = new
DefaultPartition<BlockWriter>(anOperator);
+ partitions.add(partition);
+ }
+ }
+
+ //transfer the counters
+ BlockWriter targetWriter =
partitions.iterator().next().getPartitionedInstance();
--- End diff --
Can we change baseclass too?
> HDFS output module for file copy
> --------------------------------
>
> Key: APEXMALHAR-2013
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2013
> Project: Apache Apex Malhar
> Issue Type: Task
> Reporter: Yogi Devendra
> Assignee: Yogi Devendra
>
> To write files to HDFS using block-by-block approach.
> Main use-case being to copy the files. Thus, original sequence of blocks has
> to be maintained.
> To achieve this goal, this module would use information emitted by HDFS
> input module (APEXMALHAR-2008) viz. FileMetaData, BlockMetaData, BlockData.
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)