>From <[email protected]>:

[email protected] has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18209 )


Change subject: [WIP] Support COPY TO in parquet
......................................................................

[WIP] Support COPY TO in parquet

Change-Id: I40dc16969e66af09cde04b460f441af666b39d51
---
A 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFileParquetPrinter.java
M 
asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
A 
asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/parquet/ObjectWriteSupport.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
M asterixdb/asterix-om/pom.xml
A 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFileParquetPrinterFactory.java
A 
asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/parquet/AsterixParquetWriter.java
A 
asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/parquet/ParquetRecordVisitorUtils.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
A 
asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/parquet/ParquetRecordLazyVisitor.java
10 files changed, 861 insertions(+), 4 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/09/18209/1

diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
 
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
index 1e90f98..fd88b86 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
@@ -80,7 +80,7 @@
       <compilation-unit name="supported-adapter-format-compression">
         <output-dir 
compare="Text">supported-adapter-format-compression</output-dir>
         <expected-error>ASX1188: Unsupported writing adapter 'AZUREBLOB'. 
Supported adapters: [localfs, s3]</expected-error>
-        <expected-error>ASX1189: Unsupported writing format 'csv'. Supported 
formats: [json]</expected-error>
+        <expected-error>ASX1189: Unsupported writing format 'csv'. Supported 
formats: [json, parquet]</expected-error>
         <expected-error>ASX1096: Unknown compression scheme rar. Supported 
schemes are [gzip]</expected-error>
       </compilation-unit>
     </test-case>
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 79252ad..86c8ef3 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -82,6 +82,7 @@
     public static final String KEY_EXPRESSION = "expression";
     public static final String KEY_LOCAL_SOCKET_PATH = "local-socket-path";
     public static final String KEY_FORMAT = "format";
+    public static final String SCHEMA_FORMAT = "schema";
     public static final String KEY_INCLUDE = "include";
     public static final String KEY_EXCLUDE = "exclude";
     public static final String KEY_QUOTE = "quote";
@@ -317,7 +318,7 @@
     public static final Set<String> WRITER_SUPPORTED_COMPRESSION;

     static {
-        WRITER_SUPPORTED_FORMATS = Set.of(FORMAT_JSON_LOWER_CASE);
+        WRITER_SUPPORTED_FORMATS = Set.of(FORMAT_JSON_LOWER_CASE, 
FORMAT_PARQUET);
         WRITER_SUPPORTED_ADAPTERS = 
Set.of(ALIAS_LOCALFS_ADAPTER.toLowerCase(), 
KEY_ADAPTER_NAME_AWS_S3.toLowerCase());
         WRITER_SUPPORTED_COMPRESSION = Set.of(KEY_COMPRESSION_GZIP);
     }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFileParquetPrinter.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFileParquetPrinter.java
new file mode 100644
index 0000000..59783e5
--- /dev/null
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFileParquetPrinter.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.external.writer.printer;
+
+import static 
org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED;
+import static org.apache.parquet.schema.MessageTypeParser.parseMessageType;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+
+import 
org.apache.asterix.external.writer.compressor.IExternalFileCompressStreamFactory;
+import org.apache.asterix.om.pointables.printer.parquet.AsterixParquetWriter;
+import org.apache.asterix.runtime.writer.IExternalPrinter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hyracks.algebricks.data.IPrinter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.util.HadoopStreams;
+import org.apache.parquet.io.OutputFile;
+import org.apache.parquet.io.PositionOutputStream;
+import org.apache.parquet.schema.MessageType;
+
+public class TextualExternalFileParquetPrinter implements IExternalPrinter {
+    private final IPrinter printer;
+    private final IExternalFileCompressStreamFactory compressStreamFactory;
+    private final Object typeInfo;
+    private TextualOutputStreamDelegate delegate;
+    private PrintStream printStream;
+    private String schemaString;
+    private MessageType schema;
+    private TestOutputFile testOutputFile;
+    private ParquetWriter<IValueReference> writer;
+
+    public TextualExternalFileParquetPrinter(IPrinter printer, 
IExternalFileCompressStreamFactory compressStreamFactory,
+            String schemaString, Object typeInfo) {
+        this.printer = printer;
+        this.compressStreamFactory = compressStreamFactory;
+        this.schemaString = schemaString;
+        this.typeInfo = typeInfo;
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        printer.init();
+    }
+
+    @Override
+    public void newStream(OutputStream outputStream) throws 
HyracksDataException {
+        if (printStream != null) {
+            close();
+        }
+        delegate = new 
TextualOutputStreamDelegate(compressStreamFactory.createStream(outputStream));
+        printStream = new PrintStream(delegate);
+        testOutputFile = new TestOutputFile(printStream);
+
+        try {
+            this.schema = parseMessageType(schemaString);
+        } catch (Exception e) {
+            throw new HyracksDataException("Illegal Parquet Schema provided", 
e);
+        }
+
+        Configuration conf = new Configuration();
+
+        try {
+            writer = 
AsterixParquetWriter.builder(testOutputFile).withCompressionCodec(UNCOMPRESSED).withType(schema)
+                    
.withTypeInfo(typeInfo).withRowGroupSize(1024).withPageSize(1024).withDictionaryPageSize(512)
+                    .enableDictionaryEncoding().withValidation(false)
+                    
.withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0).withConf(conf).build();
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+
+    }
+
+    @Override
+    public void print(IValueReference value) throws HyracksDataException {
+        try {
+            this.writer.write(value);
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        if (this.writer != null) {
+            try {
+                // This should also close printStream.close()
+                this.writer.close();
+            } catch (IOException e) {
+                throw HyracksDataException.create(e);
+            }
+            delegate.checkError();
+            delegate = null;
+        }
+    }
+
+    private static class TestOutputFile implements OutputFile {
+        private final PrintStream ps;
+
+        private final PositionOutputStream pos;
+
+        public TestOutputFile(PrintStream ps) {
+            this.ps = ps;
+            FSDataOutputStream fs = new FSDataOutputStream(ps, new 
FileSystem.Statistics("test"));
+            this.pos = HadoopStreams.wrap(fs);
+        }
+
+        @Override
+        public PositionOutputStream create(long blockSizeHint) throws 
IOException {
+            return pos;
+        }
+
+        @Override
+        public PositionOutputStream createOrOverwrite(long blockSizeHint) 
throws IOException {
+            return pos;
+        }
+
+        @Override
+        public boolean supportsBlockSize() {
+            return false;
+        }
+
+        @Override
+        public long defaultBlockSize() {
+            return 33554432L;
+        }
+    }
+
+}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFileParquetPrinterFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFileParquetPrinterFactory.java
new file mode 100644
index 0000000..86fb2a4
--- /dev/null
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFileParquetPrinterFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.writer.printer;
+
+import 
org.apache.asterix.external.writer.compressor.IExternalFileCompressStreamFactory;
+import org.apache.asterix.runtime.writer.IExternalPrinter;
+import org.apache.asterix.runtime.writer.IExternalPrinterFactory;
+import org.apache.hyracks.algebricks.data.IPrinterFactory;
+
+public class TextualExternalFileParquetPrinterFactory implements 
IExternalPrinterFactory {
+    private static final long serialVersionUID = 8971234908711234L;
+    private final IExternalFileCompressStreamFactory compressStreamFactory;
+    private final String schema;
+    private final IPrinterFactory printerFactory;
+
+    Object typeInfo;
+
+    public TextualExternalFileParquetPrinterFactory(IPrinterFactory 
printerFactory,
+            IExternalFileCompressStreamFactory compressStreamFactory, String 
schema, Object typeInfo) {
+        this.printerFactory = printerFactory;
+        this.compressStreamFactory = compressStreamFactory;
+        this.schema = schema;
+        this.typeInfo = typeInfo;
+    }
+
+    @Override
+    public IExternalPrinter createPrinter() {
+        return new 
TextualExternalFileParquetPrinter(printerFactory.createPrinter(), 
compressStreamFactory, schema,
+                typeInfo);
+    }
+}
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
index cf51200..a39186d 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
@@ -30,6 +30,7 @@
 import 
org.apache.asterix.external.writer.compressor.GzipExternalFileCompressStreamFactory;
 import 
org.apache.asterix.external.writer.compressor.IExternalFileCompressStreamFactory;
 import 
org.apache.asterix.external.writer.compressor.NoOpExternalFileCompressStreamFactory;
+import 
org.apache.asterix.external.writer.printer.TextualExternalFileParquetPrinterFactory;
 import 
org.apache.asterix.external.writer.printer.TextualExternalFilePrinterFactory;
 import org.apache.asterix.formats.nontagged.CleanJSONPrinterFactoryProvider;
 import org.apache.asterix.runtime.writer.ExternalFileWriterConfiguration;
@@ -108,7 +109,8 @@
         String format = configuration.get(ExternalDataConstants.KEY_FORMAT);

         // Only JSON is supported for now
-        if 
(!ExternalDataConstants.FORMAT_JSON_LOWER_CASE.equalsIgnoreCase(format)) {
+        if 
(!ExternalDataConstants.FORMAT_JSON_LOWER_CASE.equalsIgnoreCase(format)
+                && 
!ExternalDataConstants.FORMAT_PARQUET.equalsIgnoreCase(format)) {
             throw new UnsupportedOperationException("Unsupported format " + 
format);
         }

@@ -116,7 +118,21 @@
         IExternalFileCompressStreamFactory compressStreamFactory =
                 createCompressionStreamFactory(appCtx, compression, 
configuration);
         IPrinterFactory printerFactory = 
CleanJSONPrinterFactoryProvider.INSTANCE.getPrinterFactory(sourceType);
-        return new TextualExternalFilePrinterFactory(printerFactory, 
compressStreamFactory);
+
+        switch (format) {
+            case ExternalDataConstants.FORMAT_JSON_LOWER_CASE:
+                return new TextualExternalFilePrinterFactory(printerFactory, 
compressStreamFactory);
+            case ExternalDataConstants.FORMAT_PARQUET:
+
+                if 
(!configuration.containsKey(ExternalDataConstants.SCHEMA_FORMAT)) {
+                    throw new UnsupportedOperationException("Schema not 
provided for parquet");
+                }
+                String schema = 
configuration.get(ExternalDataConstants.SCHEMA_FORMAT);
+                return new 
TextualExternalFileParquetPrinterFactory(printerFactory, compressStreamFactory, 
schema,
+                        sourceType);
+            default:
+                throw new UnsupportedOperationException("Unsupported format " 
+ format);
+        }
     }

     private static String getFormat(Map<String, String> configuration) {
diff --git a/asterixdb/asterix-om/pom.xml b/asterixdb/asterix-om/pom.xml
index 6db4840..97b3946 100644
--- a/asterixdb/asterix-om/pom.xml
+++ b/asterixdb/asterix-om/pom.xml
@@ -160,5 +160,18 @@
       <groupId>it.unimi.dsi</groupId>
       <artifactId>fastutil</artifactId>
     </dependency>
+      <dependency>
+          <groupId>org.apache.parquet</groupId>
+          <artifactId>parquet-column</artifactId>
+      </dependency>
+      <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-common</artifactId>
+          <version>3.3.6</version>
+      </dependency>
+      <dependency>
+          <groupId>org.apache.parquet</groupId>
+          <artifactId>parquet-hadoop</artifactId>
+      </dependency>
   </dependencies>
 </project>
diff --git 
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/parquet/AsterixParquetWriter.java
 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/parquet/AsterixParquetWriter.java
new file mode 100644
index 0000000..57cc909
--- /dev/null
+++ 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/parquet/AsterixParquetWriter.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.om.pointables.printer.parquet;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.io.OutputFile;
+import org.apache.parquet.schema.MessageType;
+
+public class AsterixParquetWriter extends ParquetWriter<IValueReference> {
+    public static Builder builder(Path file) {
+        return new Builder(file);
+    }
+
+    public static Builder builder(OutputFile file) {
+        return new Builder(file);
+    }
+
+    AsterixParquetWriter(Path file, WriteSupport<IValueReference> writeSupport,
+            CompressionCodecName compressionCodecName, int blockSize, int 
pageSize, boolean enableDictionary,
+            boolean enableValidation, ParquetProperties.WriterVersion 
writerVersion, Configuration conf)
+            throws IOException {
+        super(file, writeSupport, compressionCodecName, blockSize, pageSize, 
pageSize, enableDictionary,
+                enableValidation, writerVersion, conf);
+    }
+
+    public static class Builder extends ParquetWriter.Builder<IValueReference, 
Builder> {
+        private MessageType type;
+        private Object typeInfo;
+        private Map<String, String> extraMetaData;
+
+        private Builder(Path file) {
+            super(file);
+            this.type = null;
+            this.extraMetaData = new HashMap();
+        }
+
+        private Builder(OutputFile file) {
+            super(file);
+            this.type = null;
+            this.extraMetaData = new HashMap();
+        }
+
+        public Builder withType(MessageType type) {
+            this.type = type;
+            return this;
+        }
+
+        public Builder withTypeInfo(Object typeInfo) {
+            this.typeInfo = typeInfo;
+            return this;
+        }
+
+        public Builder withExtraMetaData(Map<String, String> extraMetaData) {
+            this.extraMetaData = extraMetaData;
+            return this;
+        }
+
+        protected Builder self() {
+            return this;
+        }
+
+        protected WriteSupport<IValueReference> getWriteSupport(Configuration 
conf) {
+            return new ObjectWriteSupport(this.type, this.typeInfo, 
this.extraMetaData);
+        }
+    }
+}
diff --git 
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/parquet/ObjectWriteSupport.java
 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/parquet/ObjectWriteSupport.java
new file mode 100644
index 0000000..db88c4c
--- /dev/null
+++ 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/parquet/ObjectWriteSupport.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.om.pointables.printer.parquet;
+
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.io.api.RecordConsumer;
+import org.apache.parquet.schema.MessageType;
+
+public class ObjectWriteSupport extends WriteSupport<IValueReference> {
+    private MessageType schema;
+
+    private RecordConsumer recordConsumer;
+    private Map<String, String> extraMetaData;
+    ParquetRecordLazyVisitor parquetRecordLazyVisitor;
+
+    public ObjectWriteSupport(MessageType schema, Object typeInfo, Map<String, 
String> extraMetaData) {
+        this.schema = schema;
+        this.extraMetaData = extraMetaData;
+        parquetRecordLazyVisitor = new ParquetRecordLazyVisitor(schema, 
typeInfo);
+    }
+
+    public String getName() {
+        return "asterix";
+    }
+
+    public WriteSupport.WriteContext init(Configuration configuration) {
+        return new WriteSupport.WriteContext(this.schema, this.extraMetaData);
+    }
+
+    public void prepareForWrite(RecordConsumer recordConsumer) {
+        this.recordConsumer = recordConsumer;
+    }
+
+    @Override
+    public void write(IValueReference valueReference) {
+        try {
+            parquetRecordLazyVisitor.consumeRecord(valueReference, 
recordConsumer, schema);
+        } catch (HyracksDataException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+}
diff --git 
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/parquet/ParquetRecordLazyVisitor.java
 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/parquet/ParquetRecordLazyVisitor.java
new file mode 100644
index 0000000..7f045b5
--- /dev/null
+++ 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/parquet/ParquetRecordLazyVisitor.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.om.pointables.printer.parquet;
+
+import static 
org.apache.asterix.om.pointables.printer.parquet.ParquetRecordVisitorUtils.*;
+
+import org.apache.asterix.om.lazy.AbstractLazyVisitablePointable;
+import org.apache.asterix.om.lazy.AbstractListLazyVisitablePointable;
+import org.apache.asterix.om.lazy.FlatLazyVisitablePointable;
+import org.apache.asterix.om.lazy.ILazyVisitablePointableVisitor;
+import org.apache.asterix.om.lazy.RecordLazyVisitablePointable;
+import org.apache.asterix.om.lazy.TypedRecordLazyVisitablePointable;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.parquet.io.api.RecordConsumer;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+
+public class ParquetRecordLazyVisitor implements 
ILazyVisitablePointableVisitor<Void, Pair<RecordConsumer, Type>> {
+
+    private final MessageType schema;
+    private Object typeInfo;
+
+    public ParquetRecordLazyVisitor(MessageType schema, Object typeInfo) {
+        this.schema = schema;
+        this.typeInfo = typeInfo;
+    }
+
+    public MessageType getSchema() {
+        return schema;
+    }
+
+    @Override
+    public Void visit(RecordLazyVisitablePointable pointable, 
Pair<RecordConsumer, Type> arg)
+            throws HyracksDataException {
+        RecordConsumer recordConsumer = arg.first;
+        Type type = arg.second;
+        GroupType groupType = (GroupType) type;
+
+        for (int i = 0; i < pointable.getNumberOfChildren(); i++) {
+            pointable.nextChild();
+            String columnName = Stringify(pointable.getFieldName());
+            AbstractLazyVisitablePointable child = 
pointable.getChildVisitablePointable();
+
+            switch (child.getTypeTag()) {
+                case OBJECT:
+                case ARRAY:
+                case MULTISET:
+                    recordConsumer.startField(columnName, 
groupType.getFieldIndex(columnName));
+                    recordConsumer.startGroup();
+                    child.accept(this, new Pair<>(recordConsumer, 
groupType.getType(columnName)));
+                    recordConsumer.endGroup();
+                    recordConsumer.endField(columnName, 
groupType.getFieldIndex(columnName));
+                    break;
+                default:
+                    recordConsumer.startField(columnName, 
groupType.getFieldIndex(columnName));
+                    child.accept(this, new Pair<>(recordConsumer, 
groupType.getType(columnName)));
+                    recordConsumer.endField(columnName, 
groupType.getFieldIndex(columnName));
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public Void visit(AbstractListLazyVisitablePointable pointable, 
Pair<RecordConsumer, Type> arg)
+            throws HyracksDataException {
+        RecordConsumer recordConsumer = arg.first;
+        Type type = arg.second;
+        for (int i = 0; i < pointable.getNumberOfChildren(); i++) {
+            pointable.nextChild();
+            GroupType groupType = (GroupType) type;
+            AbstractLazyVisitablePointable child = 
pointable.getChildVisitablePointable();
+
+            switch (child.getTypeTag()) {
+                case OBJECT:
+                case ARRAY:
+                case MULTISET:
+                    recordConsumer.startField(LIST_FIELD, 
groupType.getFieldIndex(LIST_FIELD));
+                    recordConsumer.startGroup();
+
+                    recordConsumer.startField(ELEMENT_FIELD,
+                            
groupType.getType(LIST_FIELD).asGroupType().getFieldIndex(ELEMENT_FIELD));
+                    recordConsumer.startGroup();
+
+                    child.accept(this, new Pair<>(recordConsumer,
+                            
groupType.getType(LIST_FIELD).asGroupType().getType(ELEMENT_FIELD)));
+
+                    recordConsumer.endGroup();
+                    recordConsumer.endField(ELEMENT_FIELD,
+                            
groupType.getType(LIST_FIELD).asGroupType().getFieldIndex(ELEMENT_FIELD));
+
+                    recordConsumer.endGroup();
+                    recordConsumer.endField(LIST_FIELD, 
groupType.getFieldIndex(LIST_FIELD));
+
+                    break;
+                default:
+                    recordConsumer.startField(LIST_FIELD, 
groupType.getFieldIndex(LIST_FIELD));
+                    recordConsumer.startField(ELEMENT_FIELD,
+                            
groupType.getType(LIST_FIELD).asGroupType().getFieldIndex(ELEMENT_FIELD));
+
+                    child.accept(this, new Pair<>(recordConsumer,
+                            
groupType.getType(LIST_FIELD).asGroupType().getType(ELEMENT_FIELD)));
+
+                    recordConsumer.endField(ELEMENT_FIELD,
+                            
groupType.getType(LIST_FIELD).asGroupType().getFieldIndex(ELEMENT_FIELD));
+                    recordConsumer.endField(LIST_FIELD, 
groupType.getFieldIndex(LIST_FIELD));
+
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public Void visit(FlatLazyVisitablePointable pointable, 
Pair<RecordConsumer, Type> arg)
+            throws HyracksDataException {
+        RecordConsumer recordConsumer = arg.first;
+        Type type = arg.second;
+        addValueToColumn(recordConsumer, pointable, type.asPrimitiveType());
+        return null;
+    }
+
+    public void consumeRecord(IValueReference valueReference, RecordConsumer 
recordConsumer, MessageType schema)
+            throws HyracksDataException {
+
+        IAType type = (IAType) typeInfo;
+        if (type.getTypeTag() != ATypeTag.OBJECT) {
+            throw new HyracksDataException("Type Unsupported for parquet 
printing");
+        }
+        ARecordType recType = (ARecordType) type;
+        RecordLazyVisitablePointable rec = new 
TypedRecordLazyVisitablePointable(recType);
+
+        rec.set(valueReference);
+        recordConsumer.startMessage();
+        rec.accept(this, new Pair<>(recordConsumer, schema));
+        recordConsumer.endMessage();
+    }
+
+}
diff --git 
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/parquet/ParquetRecordVisitorUtils.java
 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/parquet/ParquetRecordVisitorUtils.java
new file mode 100644
index 0000000..c2335e0
--- /dev/null
+++ 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/parquet/ParquetRecordVisitorUtils.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.om.pointables.printer.parquet;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import 
org.apache.asterix.dataflow.data.nontagged.serde.ABooleanSerializerDeserializer;
+import 
org.apache.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
+import 
org.apache.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
+import 
org.apache.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
+import 
org.apache.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
+import 
org.apache.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
+import 
org.apache.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
+import org.apache.asterix.om.lazy.FlatLazyVisitablePointable;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.data.std.util.ByteArrayAccessibleInputStream;
+import org.apache.hyracks.util.string.UTF8StringReader;
+import org.apache.hyracks.util.string.UTF8StringUtil;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.RecordConsumer;
+import org.apache.parquet.schema.PrimitiveType;
+
+public class ParquetRecordVisitorUtils {
+
+    public static final String LIST_FIELD = "list";
+    public static final String ELEMENT_FIELD = "element";
+
+    public static String Stringify(IValueReference valueReference) throws 
HyracksDataException {
+        ByteArrayAccessibleInputStream in = new 
ByteArrayAccessibleInputStream(new byte[] {}, 0, 0);
+        DataInputStream dataIn = new DataInputStream(in);
+        UTF8StringReader utf8Reader = new UTF8StringReader();
+
+        in.setContent(valueReference.getByteArray(), 
valueReference.getStartOffset(), valueReference.getLength());
+        try {
+            return UTF8StringUtil.readUTF8(dataIn, utf8Reader);
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    public static void addValueToColumn(RecordConsumer recordConsumer, 
FlatLazyVisitablePointable pointable,
+            PrimitiveType type) throws HyracksDataException {
+
+        ATypeTag typeTag = pointable.getTypeTag();
+
+        final byte[] b = pointable.getByteArray();
+        final int s, l;
+
+        if (pointable.isTagged()) {
+            s = pointable.getStartOffset() + 1;
+            l = pointable.getLength() - 1;
+        } else {
+            s = pointable.getStartOffset();
+            l = pointable.getLength();
+        }
+
+        VoidPointable voidPointable = VoidPointable.FACTORY.createPointable();
+        voidPointable.set(b, s, l);
+
+        PrimitiveType.PrimitiveTypeName primitiveTypeName = 
type.getPrimitiveTypeName();
+
+        switch (typeTag) {
+            case TINYINT:
+                byte tinyIntValue = AInt8SerializerDeserializer.getByte(b, s);
+                switch (primitiveTypeName) {
+                    case INT32:
+                        recordConsumer.addInteger(tinyIntValue);
+                        break;
+                    case INT64:
+                        recordConsumer.addLong(tinyIntValue);
+                        break;
+                    case FLOAT:
+                        recordConsumer.addFloat(tinyIntValue);
+                        break;
+                    case DOUBLE:
+                        recordConsumer.addDouble(tinyIntValue);
+                        break;
+                    case BOOLEAN:
+                    case BINARY:
+                    case FIXED_LEN_BYTE_ARRAY:
+                    case INT96:
+                    default:
+                        throw new HyracksDataException(
+                                "Typecast impossible from " + typeTag + " to " 
+ primitiveTypeName);
+                }
+                break;
+            case SMALLINT:
+                short smallIntValue = AInt16SerializerDeserializer.getShort(b, 
s);
+                switch (primitiveTypeName) {
+                    case INT32:
+                        recordConsumer.addInteger(smallIntValue);
+                        break;
+                    case INT64:
+                        recordConsumer.addLong(smallIntValue);
+                        break;
+                    case FLOAT:
+                        recordConsumer.addFloat(smallIntValue);
+                        break;
+                    case DOUBLE:
+                        recordConsumer.addDouble(smallIntValue);
+                        break;
+                    case BOOLEAN:
+                    case BINARY:
+                    case FIXED_LEN_BYTE_ARRAY:
+                    case INT96:
+                    default:
+                        throw new HyracksDataException(
+                                "Typecast impossible from " + typeTag + " to " 
+ primitiveTypeName);
+                }
+                break;
+            case INTEGER:
+                int intValue = AInt32SerializerDeserializer.getInt(b, s);
+                switch (primitiveTypeName) {
+                    case INT32:
+                        recordConsumer.addInteger(intValue);
+                        break;
+                    case INT64:
+                        recordConsumer.addLong(intValue);
+                        break;
+                    case FLOAT:
+                        recordConsumer.addFloat(intValue);
+                        break;
+                    case DOUBLE:
+                        recordConsumer.addDouble(intValue);
+                        break;
+                    case BOOLEAN:
+                    case BINARY:
+                    case FIXED_LEN_BYTE_ARRAY:
+                    case INT96:
+                    default:
+                        throw new HyracksDataException(
+                                "Typecast impossible from " + typeTag + " to " 
+ primitiveTypeName);
+                }
+                break;
+            case BIGINT:
+                long bigIntValue = AInt64SerializerDeserializer.getLong(b, s);
+                switch (primitiveTypeName) {
+                    case INT64:
+                        recordConsumer.addLong(bigIntValue);
+                        break;
+                    case FLOAT:
+                        recordConsumer.addFloat(bigIntValue);
+                        break;
+                    case DOUBLE:
+                        recordConsumer.addDouble(bigIntValue);
+                        break;
+                    case INT32:
+                    case BOOLEAN:
+                    case BINARY:
+                    case FIXED_LEN_BYTE_ARRAY:
+                    case INT96:
+                    default:
+                        throw new HyracksDataException(
+                                "Typecast impossible from " + typeTag + " to " 
+ primitiveTypeName);
+                }
+                break;
+            case FLOAT:
+                float floatValue = AFloatSerializerDeserializer.getFloat(b, s);
+                switch (primitiveTypeName) {
+                    case INT32:
+                        recordConsumer.addInteger((int) floatValue);
+                        break;
+                    case INT64:
+                        recordConsumer.addLong((long) floatValue);
+                        break;
+                    case FLOAT:
+                        recordConsumer.addFloat(floatValue);
+                        break;
+                    case DOUBLE:
+                        recordConsumer.addDouble(floatValue);
+                        break;
+                    case BOOLEAN:
+                    case BINARY:
+                    case FIXED_LEN_BYTE_ARRAY:
+                    case INT96:
+                    default:
+                        throw new HyracksDataException(
+                                "Typecast impossible from " + typeTag + " to " 
+ primitiveTypeName);
+                }
+                break;
+            case DOUBLE:
+                double doubleValue = 
ADoubleSerializerDeserializer.getDouble(b, s);
+                switch (primitiveTypeName) {
+                    case INT32:
+                        recordConsumer.addInteger((int) doubleValue);
+                        break;
+                    case INT64:
+                        recordConsumer.addLong((long) doubleValue);
+                        break;
+                    case FLOAT:
+                        recordConsumer.addFloat((float) doubleValue);
+                        break;
+                    case DOUBLE:
+                        recordConsumer.addDouble(doubleValue);
+                        break;
+                    case BOOLEAN:
+                    case BINARY:
+                    case FIXED_LEN_BYTE_ARRAY:
+                    case INT96:
+                    default:
+                        throw new HyracksDataException(
+                                "Typecast impossible from " + typeTag + " to " 
+ primitiveTypeName);
+                }
+                break;
+            case STRING:
+                String stringValue = Stringify(voidPointable);
+                switch (primitiveTypeName) {
+                    case BINARY:
+                        
recordConsumer.addBinary(Binary.fromString(stringValue));
+                        break;
+                    case BOOLEAN:
+                    case INT32:
+                    case INT64:
+                    case FLOAT:
+                    case DOUBLE:
+                    case FIXED_LEN_BYTE_ARRAY:
+                    case INT96:
+                    default:
+                        throw new HyracksDataException(
+                                "Typecast impossible from " + typeTag + " to " 
+ primitiveTypeName);
+                }
+                break;
+            case BOOLEAN:
+                boolean booleanValue = 
ABooleanSerializerDeserializer.getBoolean(b, s);
+                switch (primitiveTypeName) {
+                    case BOOLEAN:
+                        recordConsumer.addBoolean(booleanValue);
+                        break;
+                    case BINARY:
+                    case INT32:
+                    case INT64:
+                    case FLOAT:
+                    case DOUBLE:
+                    case FIXED_LEN_BYTE_ARRAY:
+                    case INT96:
+                    default:
+                        throw new HyracksDataException(
+                                "Typecast impossible from " + typeTag + " to " 
+ primitiveTypeName);
+                }
+                break;
+            case DATE:
+            case TIME:
+            case DURATION:
+            case POINT:
+            case POINT3D:
+            case ARRAY:
+            case MULTISET:
+            case OBJECT:
+            case SPARSOBJECT:
+            case UNION:
+            case ENUM:
+            case TYPE:
+            case ANY:
+            case LINE:
+            case POLYGON:
+            case CIRCLE:
+            case RECTANGLE:
+            case INTERVAL:
+            case SYSTEM_NULL:
+            case YEARMONTHDURATION:
+            case DAYTIMEDURATION:
+            case UUID:
+            case SHORTWITHOUTTYPEINFO:
+            case NULL:
+            case GEOMETRY:
+            case BINARY:
+            case UINT8:
+            case UINT16:
+            case UINT32:
+            case UINT64:
+            case BITARRAY:
+            case MISSING:
+            case DATETIME:
+            default:
+                throw new HyracksDataException("TYPE " + typeTag + " 
UNEXPECTED");
+        }
+
+    }
+
+}

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18209
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I40dc16969e66af09cde04b460f441af666b39d51
Gerrit-Change-Number: 18209
Gerrit-PatchSet: 1
Gerrit-Owner: [email protected]
Gerrit-MessageType: newchange

Reply via email to