[jira] [Resolved] (SPARK-22841) Select regexp_extract from table with where clause having is null throws indexoutofbounds exception
[ https://issues.apache.org/jira/browse/SPARK-22841?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen resolved SPARK-22841. --- Resolution: Not A Problem > Select regexp_extract from table with where clause having is null throws > indexoutofbounds exception > --- > > Key: SPARK-22841 > URL: https://issues.apache.org/jira/browse/SPARK-22841 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: Chetan Bhat > > Steps : > Thrift server is started using the command - bin/spark-submit --master > yarn-client --executor-memory 10G --executor-cores 5 --driver-memory 5G > --num-executors 3 --class > org.apache.carbondata.spark.thriftserver.CarbonThriftServer > /srv/spark2.2Bigdata/install/spark/sparkJdbc/carbonlib/carbondata_2.11-1.3.0-SNAPSHOT-shade-hadoop2.7.2.jar > "hdfs://hacluster/user/sparkhive/warehouse" > Spark shell is launched using the command - bin/spark-shell --master > yarn-client --executor-memory 10G --executor-cores 5 --driver-memory 5G > --num-executors 3 --jars > /srv/spark2.2Bigdata/install/spark/sparkJdbc/carbonlib/carbondata_2.11-1.3.0-SNAPSHOT-shade-hadoop2.7.2.jar > From Spark shell the streaming table is created and data is loaded to the > streaming table. > import java.io. > {File, PrintWriter} > import java.net.ServerSocket > import org.apache.spark.sql. > {CarbonEnv, SparkSession} > import org.apache.spark.sql.hive.CarbonRelation > import org.apache.spark.sql.streaming. > {ProcessingTime, StreamingQuery} > import org.apache.carbondata.core.constants.CarbonCommonConstants > import org.apache.carbondata.core.util.CarbonProperties > import org.apache.carbondata.core.util.path. > {CarbonStorePath, CarbonTablePath} > CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, > "/MM/dd") > import org.apache.spark.sql.CarbonSession._ > val carbonSession = SparkSession. > builder(). > appName("StreamExample"). > getOrCreateCarbonSession("hdfs://hacluster/user/hive/warehouse/carbon.store") > carbonSession.sparkContext.setLogLevel("INFO") > def sql(sql: String) = carbonSession.sql(sql) > def writeSocket(serverSocket: ServerSocket): Thread = { > val thread = new Thread() { > override def run(): Unit = { > // wait for client to connection request and accept > val clientSocket = serverSocket.accept() > val socketWriter = new PrintWriter(clientSocket.getOutputStream()) > var index = 0 > for (_ <- 1 to 1000) { > // write 5 records per iteration > for (_ <- 0 to 100) > { index = index + 1 socketWriter.println(index.toString + ",name_" + index + > ",city_" + index + "," + (index * 1.00).toString + ",school_" + index + > ":school_" + index + index + "$" + index) } > socketWriter.flush() > Thread.sleep(2000) > } > socketWriter.close() > System.out.println("Socket closed") > } > } > thread.start() > thread > } > def startStreaming(spark: SparkSession, tablePath: CarbonTablePath, > tableName: String, port: Int): Thread = { > val thread = new Thread() { > override def run(): Unit = { > var qry: StreamingQuery = null > try > { val readSocketDF = spark.readStream .format("socket") .option("host", > "10.18.98.34") .option("port", port) .load() qry = readSocketDF.writeStream > .format("carbondata") .trigger(ProcessingTime("5 seconds")) > .option("checkpointLocation", tablePath.getStreamingCheckpointDir) > .option("tablePath", tablePath.getPath).option("tableName", tableName) > .start() qry.awaitTermination() } > catch > { case ex: Throwable => ex.printStackTrace() println("Done reading and > writing streaming data") } > finally > { qry.stop() } > } > } > thread.start() > thread > } > val streamTableName = "uniqdata" > sql(s"CREATE TABLE uniqdata (CUST_ID int,CUST_NAME String,ACTIVE_EMUI_VERSION > string, DOB timestamp, DOJ timestamp, BIGINT_COLUMN1 bigint,BIGINT_COLUMN2 > bigint,DECIMAL_COLUMN1 decimal(30,10), DECIMAL_COLUMN2 > decimal(36,36),Double_COLUMN1 double, Double_COLUMN2 double,INTEGER_COLUMN1 > int) STORED BY 'org.apache.carbondata.format' > TBLPROPERTIES('streaming'='true')") > sql(s"LOAD DATA INPATH 'hdfs://hacluster/chetan/2000_UniqData.csv' into table > uniqdata OPTIONS( > 'BAD_RECORDS_ACTION'='FORCE','FILEHEADER'='CUST_ID,CUST_NAME,ACTIVE_EMUI_VERSION,DOB,DOJ,BIGINT_COLUMN1,BIGINT_COLUMN2,DECIMAL_COLUMN1,DECIMAL_COLUMN2,Double_COLUMN1,Double_COLUMN2,INTEGER_COLUMN1')") > val carbonTable = CarbonEnv.getInstance(carbonSession).carbonMetastore. > lookupRelation(Some("default"), > streamTableName)(carbonSession).asInstanceOf[CarbonRelation].carbonTable > val tablePath = > CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) > val port = 8006 > val serverSocket = new ServerSocket(port) > val socketThread = writeSocket(serverSocket) > val stre
[jira] [Resolved] (SPARK-22841) Select regexp_extract from table with where clause having is null throws indexoutofbounds exception
[ https://issues.apache.org/jira/browse/SPARK-22841?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen resolved SPARK-22841. --- Resolution: Invalid As the error says, there's no matching group in your regex, so this fails. Because it's a 'programmer error' and not a function of the data, this is the kind of result I'd expect rather than NULL or something. > Select regexp_extract from table with where clause having is null throws > indexoutofbounds exception > --- > > Key: SPARK-22841 > URL: https://issues.apache.org/jira/browse/SPARK-22841 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: Chetan Bhat > > Steps : > Thrift server is started using the command - bin/spark-submit --master > yarn-client --executor-memory 10G --executor-cores 5 --driver-memory 5G > --num-executors 3 --class > org.apache.carbondata.spark.thriftserver.CarbonThriftServer > /srv/spark2.2Bigdata/install/spark/sparkJdbc/carbonlib/carbondata_2.11-1.3.0-SNAPSHOT-shade-hadoop2.7.2.jar > "hdfs://hacluster/user/sparkhive/warehouse" > Spark shell is launched using the command - bin/spark-shell --master > yarn-client --executor-memory 10G --executor-cores 5 --driver-memory 5G > --num-executors 3 --jars > /srv/spark2.2Bigdata/install/spark/sparkJdbc/carbonlib/carbondata_2.11-1.3.0-SNAPSHOT-shade-hadoop2.7.2.jar > From Spark shell the streaming table is created and data is loaded to the > streaming table. > import java.io. > {File, PrintWriter} > import java.net.ServerSocket > import org.apache.spark.sql. > {CarbonEnv, SparkSession} > import org.apache.spark.sql.hive.CarbonRelation > import org.apache.spark.sql.streaming. > {ProcessingTime, StreamingQuery} > import org.apache.carbondata.core.constants.CarbonCommonConstants > import org.apache.carbondata.core.util.CarbonProperties > import org.apache.carbondata.core.util.path. > {CarbonStorePath, CarbonTablePath} > CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, > "/MM/dd") > import org.apache.spark.sql.CarbonSession._ > val carbonSession = SparkSession. > builder(). > appName("StreamExample"). > getOrCreateCarbonSession("hdfs://hacluster/user/hive/warehouse/carbon.store") > carbonSession.sparkContext.setLogLevel("INFO") > def sql(sql: String) = carbonSession.sql(sql) > def writeSocket(serverSocket: ServerSocket): Thread = { > val thread = new Thread() { > override def run(): Unit = { > // wait for client to connection request and accept > val clientSocket = serverSocket.accept() > val socketWriter = new PrintWriter(clientSocket.getOutputStream()) > var index = 0 > for (_ <- 1 to 1000) { > // write 5 records per iteration > for (_ <- 0 to 100) > { index = index + 1 socketWriter.println(index.toString + ",name_" + index + > ",city_" + index + "," + (index * 1.00).toString + ",school_" + index + > ":school_" + index + index + "$" + index) } > socketWriter.flush() > Thread.sleep(2000) > } > socketWriter.close() > System.out.println("Socket closed") > } > } > thread.start() > thread > } > def startStreaming(spark: SparkSession, tablePath: CarbonTablePath, > tableName: String, port: Int): Thread = { > val thread = new Thread() { > override def run(): Unit = { > var qry: StreamingQuery = null > try > { val readSocketDF = spark.readStream .format("socket") .option("host", > "10.18.98.34") .option("port", port) .load() qry = readSocketDF.writeStream > .format("carbondata") .trigger(ProcessingTime("5 seconds")) > .option("checkpointLocation", tablePath.getStreamingCheckpointDir) > .option("tablePath", tablePath.getPath).option("tableName", tableName) > .start() qry.awaitTermination() } > catch > { case ex: Throwable => ex.printStackTrace() println("Done reading and > writing streaming data") } > finally > { qry.stop() } > } > } > thread.start() > thread > } > val streamTableName = "uniqdata" > sql(s"CREATE TABLE uniqdata (CUST_ID int,CUST_NAME String,ACTIVE_EMUI_VERSION > string, DOB timestamp, DOJ timestamp, BIGINT_COLUMN1 bigint,BIGINT_COLUMN2 > bigint,DECIMAL_COLUMN1 decimal(30,10), DECIMAL_COLUMN2 > decimal(36,36),Double_COLUMN1 double, Double_COLUMN2 double,INTEGER_COLUMN1 > int) STORED BY 'org.apache.carbondata.format' > TBLPROPERTIES('streaming'='true')") > sql(s"LOAD DATA INPATH 'hdfs://hacluster/chetan/2000_UniqData.csv' into table > uniqdata OPTIONS( > 'BAD_RECORDS_ACTION'='FORCE','FILEHEADER'='CUST_ID,CUST_NAME,ACTIVE_EMUI_VERSION,DOB,DOJ,BIGINT_COLUMN1,BIGINT_COLUMN2,DECIMAL_COLUMN1,DECIMAL_COLUMN2,Double_COLUMN1,Double_COLUMN2,INTEGER_COLUMN1')") > val carbonTable = CarbonEnv.getInstance(carbonSession).carbonMetastore. > lookupRelation(Some("default"), > streamTableName)(carbonSession).asInstanceOf[CarbonRelation].carbonTable > val ta