iemejia commented on a change in pull request #14373:
URL: https://github.com/apache/beam/pull/14373#discussion_r611011688



##########
File path: 
sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamSqlEnvRunner.java
##########
@@ -34,155 +43,196 @@
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TypeDescriptors;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletionService;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * This class executes jobs using BeamSqlEnv, it uses BeamSqlEnv.executeDdl 
and BeamSqlEnv.parseQuery to run queries.
+ * This class executes jobs using BeamSqlEnv, it uses BeamSqlEnv.executeDdl and
+ * BeamSqlEnv.parseQuery to run queries.
  */
 public class BeamSqlEnvRunner {
-    private static final String DATA_DIRECTORY = "gs://beamsql_tpcds_1/data";
-    private static final String RESULT_DIRECTORY = 
"gs://beamsql_tpcds_1/tpcds_results";
-    private static final String SUMMARY_START = "\n" + "TPC-DS Query Execution 
Summary:";
-    private static final List<String> SUMMARY_HEADERS_LIST = 
Arrays.asList("Query Name", "Job Name", "Data Size", "Dialect", "Status", 
"Start Time", "End Time", "Elapsed Time(sec)");
-
-    private static final Logger Log = 
LoggerFactory.getLogger(BeamSqlEnvRunner.class);
-
-    private static String buildTableCreateStatement(String tableName) {
-        String createStatement = "CREATE EXTERNAL TABLE " + tableName + " (%s) 
TYPE text LOCATION '%s' TBLPROPERTIES '{\"format\":\"csv\", \"csvformat\": 
\"InformixUnload\"}'";
-        return createStatement;
+  private static final String DATA_DIRECTORY = "gs://beamsql_tpcds_1/data";
+  private static final String RESULT_DIRECTORY = 
"gs://beamsql_tpcds_1/tpcds_results";
+  private static final String SUMMARY_START = "\n" + "TPC-DS Query Execution 
Summary:";
+  private static final List<String> SUMMARY_HEADERS_LIST =
+      Arrays.asList(
+          "Query Name",
+          "Job Name",
+          "Data Size",
+          "Dialect",
+          "Status",
+          "Start Time",
+          "End Time",
+          "Elapsed Time(sec)");
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(BeamSqlEnvRunner.class);
+
+  private static String buildTableCreateStatement(String tableName) {
+    return "CREATE EXTERNAL TABLE "
+        + tableName
+        + " (%s) TYPE text LOCATION '%s' TBLPROPERTIES '{\"format\":\"csv\", 
\"csvformat\": \"InformixUnload\"}'";
+  }
+
+  private static String buildDataLocation(String dataSize, String tableName) {
+    return DATA_DIRECTORY + "/" + dataSize + "/" + tableName + ".dat";
+  }
+
+  /**
+   * Register all tables into BeamSqlEnv, set their schemas, and set the 
locations where their
+   * corresponding data are stored. Currently this method is not supported by 
ZetaSQL planner.
+   */
+  private static void registerAllTablesByBeamSqlEnv(BeamSqlEnv env, String 
dataSize)
+      throws Exception {
+    List<String> tableNames = TableSchemaJSONLoader.getAllTableNames();
+    for (String tableName : tableNames) {
+      String createStatement = buildTableCreateStatement(tableName);
+      String tableSchema = TableSchemaJSONLoader.parseTableSchema(tableName);
+      String dataLocation = buildDataLocation(dataSize, tableName);
+      env.executeDdl(String.format(createStatement, tableSchema, 
dataLocation));
     }
-
-    private static String buildDataLocation(String dataSize, String tableName) 
{
-        String dataLocation = DATA_DIRECTORY + "/" + dataSize + "/" + 
tableName + ".dat";
-        return dataLocation;
+  }
+
+  /**
+   * Register all tables into InMemoryMetaStore, set their schemas, and set 
the locations where
+   * their corresponding data are stored.
+   */
+  private static void registerAllTablesByInMemoryMetaStore(
+      InMemoryMetaStore inMemoryMetaStore, String dataSize) throws Exception {
+    JSONObject properties = new JSONObject();
+    properties.put("csvformat", "InformixUnload");
+
+    Map<String, Schema> schemaMap = TpcdsSchemas.getTpcdsSchemas();
+    for (Map.Entry<String, Schema> entry : schemaMap.entrySet()) {
+      String tableName = entry.getKey();
+      String dataLocation = DATA_DIRECTORY + "/" + dataSize + "/" + tableName 
+ ".dat";
+      Schema tableSchema = schemaMap.get(tableName);
+      checkArgumentNotNull(tableSchema, "Table schema can't be null for table: 
" + tableName);
+      Table table =
+          Table.builder()
+              .name(tableName)
+              .schema(tableSchema)
+              .location(dataLocation)
+              .properties(properties)
+              .type("text")
+              .build();
+      inMemoryMetaStore.createTable(table);
     }
-
-    /** Register all tables into BeamSqlEnv, set their schemas, and set the 
locations where their corresponding data are stored.
-     *  Currently this method is not supported by ZetaSQL planner. */
-    private static void registerAllTablesByBeamSqlEnv(BeamSqlEnv env, String 
dataSize) throws Exception {
-        List<String> tableNames = TableSchemaJSONLoader.getAllTableNames();
-        for (String tableName : tableNames) {
-            String createStatement = buildTableCreateStatement(tableName);
-            String tableSchema = 
TableSchemaJSONLoader.parseTableSchema(tableName);
-            String dataLocation = buildDataLocation(dataSize, tableName);
-            env.executeDdl(String.format(createStatement, tableSchema, 
dataLocation));
-        }
+  }
+
+  /**
+   * Print the summary table after all jobs are finished.
+   *
+   * @param completion A collection of all TpcdsRunResult that are from 
finished jobs.
+   * @param numOfResults The number of results in the collection.
+   * @throws Exception
+   */
+  private static void printExecutionSummary(
+      CompletionService<TpcdsRunResult> completion, int numOfResults) throws 
Exception {
+    List<List<String>> summaryRowsList = new ArrayList<>();
+    for (int i = 0; i < numOfResults; i++) {
+      TpcdsRunResult tpcdsRunResult = completion.take().get();
+      List<String> list = new ArrayList<>();
+      list.add(tpcdsRunResult.getQueryName());
+      list.add(tpcdsRunResult.getJobName());
+      list.add(tpcdsRunResult.getDataSize());
+      list.add(tpcdsRunResult.getDialect());
+      list.add(tpcdsRunResult.getIsSuccessful() ? "Successful" : "Failed");
+      list.add(tpcdsRunResult.getIsSuccessful() ? 
tpcdsRunResult.getStartDate().toString() : "");
+      list.add(tpcdsRunResult.getIsSuccessful() ? 
tpcdsRunResult.getEndDate().toString() : "");
+      list.add(
+          tpcdsRunResult.getIsSuccessful() ? 
Double.toString(tpcdsRunResult.getElapsedTime()) : "");
+      summaryRowsList.add(list);
     }
 
-    /** Register all tables into InMemoryMetaStore, set their schemas, and set 
the locations where their corresponding data are stored. */
-    private static void registerAllTablesByInMemoryMetaStore(InMemoryMetaStore 
inMemoryMetaStore, String dataSize) throws Exception {
-        JSONObject properties = new JSONObject();
-        properties.put("csvformat", "InformixUnload");
-
-        Map<String, Schema> schemaMap = TpcdsSchemas.getTpcdsSchemas();
-        for (String tableName : schemaMap.keySet()) {
-            String dataLocation = DATA_DIRECTORY + "/" + dataSize + "/" + 
tableName + ".dat";
-            Schema tableSchema = schemaMap.get(tableName);
-            Table table = 
Table.builder().name(tableName).schema(tableSchema).location(dataLocation).properties(properties).type("text").build();
-            inMemoryMetaStore.createTable(table);
-        }
+    System.out.println(SUMMARY_START);
+    System.out.println(SummaryGenerator.generateTable(SUMMARY_HEADERS_LIST, 
summaryRowsList));
+  }
+
+  /**
+   * This is the alternative method in BeamTpcds.main method. Run job using 
BeamSqlEnv.parseQuery()
+   * method. (Doesn't perform well when running query96).
+   *
+   * @param args Command line arguments
+   * @throws Exception
+   */
+  public static void runUsingBeamSqlEnv(String[] args) throws Exception {
+    InMemoryMetaStore inMemoryMetaStore = new InMemoryMetaStore();
+    inMemoryMetaStore.registerProvider(new TextTableProvider());
+
+    TpcdsOptions tpcdsOptions =
+        
PipelineOptionsFactory.fromArgs(args).withValidation().as(TpcdsOptions.class);
+
+    String dataSize = TpcdsParametersReader.getAndCheckDataSize(tpcdsOptions);
+    String[] queryNameArr = 
TpcdsParametersReader.getAndCheckQueryNameArray(tpcdsOptions);
+    int nThreads = TpcdsParametersReader.getAndCheckTpcParallel(tpcdsOptions);
+
+    // Using ExecutorService and CompletionService to fulfill multi-threading 
functionality
+    ExecutorService executor = Executors.newFixedThreadPool(nThreads);
+    CompletionService<TpcdsRunResult> completion = new 
ExecutorCompletionService<>(executor);
+
+    // Directly create all tables and register them into inMemoryMetaStore 
before creating
+    // BeamSqlEnv object.
+    registerAllTablesByInMemoryMetaStore(inMemoryMetaStore, dataSize);
+
+    BeamSqlPipelineOptions beamSqlPipelineOptions = 
tpcdsOptions.as(BeamSqlPipelineOptions.class);
+    BeamSqlEnv env =
+        BeamSqlEnv.builder(inMemoryMetaStore)
+            .setPipelineOptions(beamSqlPipelineOptions)
+            .setQueryPlannerClassName(beamSqlPipelineOptions.getPlannerName())
+            .build();
+
+    // Make an array of pipelines, each pipeline is responsible for running a 
corresponding query.
+    Pipeline[] pipelines = new Pipeline[queryNameArr.length];
+
+    // Execute all queries, transform the each result into a 
PCollection<String>, write them into
+    // the txt file and store in a GCP directory.
+    for (int i = 0; i < queryNameArr.length; i++) {
+      // For each query, get a copy of pipelineOptions from command line 
arguments, cast
+      // tpcdsOptions as a DataflowPipelineOptions object to read and set 
required parameters for
+      // pipeline execution.
+      TpcdsOptions tpcdsOptionsCopy =
+          
PipelineOptionsFactory.fromArgs(args).withValidation().as(TpcdsOptions.class);
+      DataflowPipelineOptions dataflowPipelineOptionsCopy =

Review comment:
       This extra copy is unneeded because its only use is to `setJobName` 
which is generic and part of `PipelineOptions` so we can do it instead in the 
TpcdsOptions copy. I even wonder if we really need that copy I think we can 
just reuse the existing `tpcdsOptions` and do `setJobName` for each case. The 
original value is always preserved because we have it in `args.
   

##########
File path: 
sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsParametersReader.java
##########
@@ -22,86 +22,88 @@
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-/**
- * Get and check the TpcdsOptions' parameters, throw exceptions when user 
input is invalid
- */
+/** Get and check the TpcdsOptions' parameters, throw exceptions when user 
input is invalid. */
 public class TpcdsParametersReader {
-    /** The data sizes that have been supported. */
-    private static final Set<String> supportedDataSizes = Stream.of("1G", 
"10G", "100G").collect(Collectors.toCollection(HashSet::new));
-
-    /**
-     * Get and check dataSize entered by user. This dataSize has to have been 
supported.
-     *
-     * @param tpcdsOptions TpcdsOptions object constructed from user input
-     * @return The dateSize user entered, if it is contained in 
supportedDataSizes set.
-     * @throws Exception
-     */
-    public static String getAndCheckDataSize(TpcdsOptions tpcdsOptions) throws 
Exception {
-        String dataSize = tpcdsOptions.getDataSize();
+  /** The data sizes that have been supported. */
+  private static final Set<String> supportedDataSizes =
+      Stream.of("1G", "10G", 
"100G").collect(Collectors.toCollection(HashSet::new));
 
-        if (!supportedDataSizes.contains(dataSize)) {
-            throw new Exception("The data size you entered has not been 
supported.");
-        }
+  /**
+   * Get and check dataSize entered by user. This dataSize has to have been 
supported.
+   *
+   * @param tpcdsOptions TpcdsOptions object constructed from user input
+   * @return The dateSize user entered, if it is contained in 
supportedDataSizes set.
+   * @throws Exception
+   */
+  public static String getAndCheckDataSize(TpcdsOptions tpcdsOptions) throws 
Exception {
+    String dataSize = tpcdsOptions.getDataSize();
 
-        return dataSize;
+    if (!supportedDataSizes.contains(dataSize)) {
+      throw new Exception("The data size you entered has not been supported.");
     }
 
-    /**
-     * Get and check queries entered by user. This has to be a string of 
numbers separated by commas or "all" which means run all 99 queiries.
-     * All query numbers have to be between 1 and 99.
-     *
-     * @param tpcdsOptions TpcdsOptions object constructed from user input
-     * @return An array of query names, for example "1,2,7" will be output as 
"query1,query2,query7"
-     * @throws Exception
-     */
-    public static String[] getAndCheckQueryNameArray(TpcdsOptions 
tpcdsOptions) throws Exception {
-        String queryNums = tpcdsOptions.getQueries();
+    return dataSize;
+  }
 
-        String[] queryNumArr;
-        if (queryNums.toLowerCase().equals("all")) {
-            // All 99 TPC-DS queries need to be executed.
-            queryNumArr = new String[99];
-            for (int i = 0; i < 99; i++) {
-                queryNumArr[i] = Integer.toString(i + 1);
-            }
-        } else {
-            // Split user input queryNums by spaces and commas, get an array 
of all query numbers.
-            queryNumArr = queryNums.split("[\\s,]+");
+  /**
+   * Get and check queries entered by user. This has to be a string of numbers 
separated by commas
+   * or "all" which means run all 99 queiries. All query numbers have to be 
between 1 and 99.

Review comment:
       s/quieries/queries




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to