[GitHub] carbondata pull request #2850: [WIP] Added concurrent reading through SDK

2018-10-29 Thread ajantha-bhat
Github user ajantha-bhat commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2850#discussion_r229183542
  
--- Diff: 
store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkReaderTest.java
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.file;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+
+import junit.framework.TestCase;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOExceptionWithCause;
+import org.junit.*;
+
+/**
+ * multi-thread Test suite for {@link CarbonReader}
+ */
+public class ConcurrentSdkReaderTest {
+
+  private static final String dataDir = "./testReadFiles";
+
+  @Before
+  @After
+  public void cleanTestData() {
+try {
+  FileUtils.deleteDirectory(new File(dataDir));
+} catch (Exception e) {
+  e.printStackTrace();
+  Assert.fail(e.getMessage());
+}
+  }
+
+  private void writeTestData(long numRows, int tableBlockSize) {
+Field[] fields = new Field[2];
+fields[0] = new Field("stringField", DataTypes.STRING);
+fields[1] = new Field("intField", DataTypes.INT);
+
+Map tableProperties = new HashMap<>();
+tableProperties.put("table_blocksize", 
Integer.toString(tableBlockSize));
+
+CarbonWriterBuilder builder =
+
CarbonWriter.builder().outputPath(dataDir).withTableProperties(tableProperties)
+.withCsvInput(new Schema(fields));
+
+try {
+  CarbonWriter writer = builder.build();
+
+  for (long i = 0; i < numRows; ++i) {
+writer.write(new String[] { "robot_" + i, String.valueOf(i) });
+  }
+  writer.close();
+} catch (Exception e) {
+  e.printStackTrace();
+  Assert.fail(e.getMessage());
+}
+  }
+
+  @Test public void testReadParallely() throws IOException, 
InterruptedException {
+long numRows = 1000;
--- End diff --

We must not add huge record test case in UT. PR builder time for all the PR 
will affect by this. Locally test with huge data but update UT with fewer rows. 
say 10 rows.


---


[GitHub] carbondata pull request #2850: [WIP] Added concurrent reading through SDK

2018-10-29 Thread ajantha-bhat
Github user ajantha-bhat commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2850#discussion_r229183179
  
--- Diff: 
store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java ---
@@ -114,6 +117,43 @@ public static CarbonReaderBuilder builder(String 
tablePath) {
 return builder(tablePath, tableName);
   }
 
+  /**
+   * Return a new list of {@link CarbonReader} objects
+   *
+   * @param maxSplits
+   */
+  public List split(int maxSplits) throws IOException {
+validateReader();
+if (maxSplits < 1) {
+  throw new RuntimeException(
+  this.getClass().getSimpleName() + ".split: maxSplits must be 
positive");
+}
+
+List carbonReaders = new ArrayList<>();
+
+// If maxSplits < readers.size
+// Split the reader into maxSplits splits with each
+// element contains >= 1 CarbonRecordReader objects
+if (maxSplits < this.readers.size()) {
+  for (int i = 0; i < maxSplits; ++i) {
+carbonReaders.add(new CarbonReader<>(this.readers
+.subList((int) Math.ceil((float) (i * this.readers.size()) / 
maxSplits),
--- End diff --

this is constant, do this outside loop and use it each time


---


[GitHub] carbondata pull request #2850: [WIP] Added concurrent reading through SDK

2018-10-29 Thread ajantha-bhat
Github user ajantha-bhat commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2850#discussion_r229179978
  
--- Diff: 
store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java ---
@@ -114,6 +117,43 @@ public static CarbonReaderBuilder builder(String 
tablePath) {
 return builder(tablePath, tableName);
   }
 
+  /**
+   * Return a new list of {@link CarbonReader} objects
+   *
--- End diff --

Add a clear description, mention what happens if splits greater than the 
number of files and what happens if splits are lesser than the number of files


---


[GitHub] carbondata pull request #2850: [WIP] Added concurrent reading through SDK

2018-10-29 Thread ajantha-bhat
Github user ajantha-bhat commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2850#discussion_r229179713
  
--- Diff: 
store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java ---
@@ -114,6 +117,43 @@ public static CarbonReaderBuilder builder(String 
tablePath) {
 return builder(tablePath, tableName);
   }
 
+  /**
+   * Return a new list of {@link CarbonReader} objects
+   *
+   * @param maxSplits
+   */
+  public List split(int maxSplits) throws IOException {
+validateReader();
+if (maxSplits < 1) {
+  throw new RuntimeException(
+  this.getClass().getSimpleName() + ".split: maxSplits must be 
positive");
+}
+
+List carbonReaders = new ArrayList<>();
+
+// If maxSplits < readers.size
+// Split the reader into maxSplits splits with each
+// element contains >= 1 CarbonRecordReader objects
+if (maxSplits < this.readers.size()) {
+  for (int i = 0; i < maxSplits; ++i) {
+carbonReaders.add(new CarbonReader<>(this.readers
+.subList((int) Math.ceil((float) (i * this.readers.size()) / 
maxSplits),
+(int) Math.ceil((float) ((i + 1) * this.readers.size()) / 
maxSplits;
+  }
+}
+// If maxSplits >= readers.size
+// Split the reader into reader.size splits with each
--- End diff --

keep comments inside else block for easy reading of the code


---


[GitHub] carbondata pull request #2850: [WIP] Added concurrent reading through SDK

2018-10-29 Thread ajantha-bhat
Github user ajantha-bhat commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2850#discussion_r229179428
  
--- Diff: 
store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java ---
@@ -114,6 +117,43 @@ public static CarbonReaderBuilder builder(String 
tablePath) {
 return builder(tablePath, tableName);
   }
 
+  /**
+   * Return a new list of {@link CarbonReader} objects
+   *
+   * @param maxSplits
+   */
+  public List split(int maxSplits) throws IOException {
--- End diff --

Need to add new interfaces exposed in sdk-guide.md. you can add this API 
info there.


---


[GitHub] carbondata pull request #2850: [WIP] Added concurrent reading through SDK

2018-10-27 Thread xubo245
Github user xubo245 commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2850#discussion_r228706363
  
--- Diff: 
store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkReaderTest.java
 ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.file;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+
+import junit.framework.TestCase;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOExceptionWithCause;
+import org.junit.*;
+
+/**
+ * multi-thread Test suite for {@link CarbonReader}
+ */
+public class ConcurrentSdkReaderTest extends TestCase {
+
+  private static final String dataDir = "./testReadFiles";
+
+  public void cleanTestData() {
+try {
+  FileUtils.deleteDirectory(new File(dataDir));
+} catch (Exception e) {
+  e.printStackTrace();
+  Assert.fail(e.getMessage());
+}
+  }
+
+  private void writeTestData(long numRows, int tableBlockSize) {
+cleanTestData();
+
+Field[] fields = new Field[2];
+fields[0] = new Field("stringField", DataTypes.STRING);
+fields[1] = new Field("intField", DataTypes.INT);
+
+Map tableProperties = new HashMap<>();
+tableProperties.put("table_blocksize", 
Integer.toString(tableBlockSize));
+
+CarbonWriterBuilder builder =
+
CarbonWriter.builder().outputPath(dataDir).withTableProperties(tableProperties)
+.withCsvInput(new Schema(fields));
+
+try {
+  CarbonWriter writer = builder.build();
+
+  for (long i = 0; i < numRows; ++i) {
+writer.write(new String[] { "robot_" + i, String.valueOf(i) });
+  }
+  writer.close();
+} catch (Exception e) {
+  e.printStackTrace();
+  Assert.fail(e.getMessage());
+}
+  }
+
+  @Test public void testReadParallely() throws IOException, 
InterruptedException {
+long numRows = 1000;
+int tableBlockSize = 10;
+short numThreads = 4;
+writeTestData(numRows, tableBlockSize);
+long count;
+
+CarbonReader reader = CarbonReader.builder(dataDir).build();
+try {
+  count = 0;
+  long start = System.currentTimeMillis();
+  while (reader.hasNext()) {
+reader.readNextRow();
+count += 1;
+  }
+  long end = System.currentTimeMillis();
+  System.out.println("[Sequential read] Time:" + (end - start));
+  Assert.assertEquals(numRows, count);
+} catch (Exception e) {
+  e.printStackTrace();
+  Assert.fail(e.getMessage());
+} finally {
+  reader.close();
+}
+
+ExecutorService executorService = 
Executors.newFixedThreadPool(numThreads);
+CarbonReader reader2 = CarbonReader.builder(dataDir).build();
+try {
+  List multipleReaders = reader2.split(numThreads);
+  List results = new ArrayList<>();
+  count = 0;
+  long start = System.currentTimeMillis();
+  for (CarbonReader reader_i : multipleReaders) {
+results.add(executorService.submit(new ReadLogic(reader_i)));
+  }
+  for (Future result_i : results) {
+count += (long) result_i.get();
+  }
+  long end = System.currentTimeMillis();
+  System.out.println("[Parallel read] Time:" + (end - start));
--- End diff --

Please add unit for it, such as ms


---


[GitHub] carbondata pull request #2850: [WIP] Added concurrent reading through SDK

2018-10-27 Thread xubo245
Github user xubo245 commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2850#discussion_r228706276
  
--- Diff: 
store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkReaderTest.java
 ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.file;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+
+import junit.framework.TestCase;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOExceptionWithCause;
+import org.junit.*;
+
+/**
+ * multi-thread Test suite for {@link CarbonReader}
+ */
+public class ConcurrentSdkReaderTest extends TestCase {
+
+  private static final String dataDir = "./testReadFiles";
+
+  public void cleanTestData() {
--- End diff --

you can add @Before or @After got it. especially there are many test case


---