Bago,
Finally I am able to create one which fails consistently. I think the issue
is caused by the VectorAssembler in the model. In the new code, I have 2
features(1 text and 1 number) and I have to run through a VectorAssembler
before giving to LogisticRegression. Code and test data below
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
import org.apache.spark.ml.feature.CountVectorizer;
import org.apache.spark.ml.feature.CountVectorizerModel;
import org.apache.spark.ml.feature.IndexToString;
import org.apache.spark.ml.feature.StringIndexer;
import org.apache.spark.ml.feature.StringIndexerModel;
import org.apache.spark.ml.feature.Tokenizer;
import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.ml.param.ParamMap;
import org.apache.spark.ml.tuning.ParamGridBuilder;
import org.apache.spark.ml.tuning.TrainValidationSplit;
import org.apache.spark.ml.tuning.TrainValidationSplitModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
/**
* A simple text classification pipeline that recognizes "spark" from input
text.
*/
public class StreamingIssueCountVectorizerSplitFailed {
public static void main(String[] args) throws Exception {
SparkSession sparkSession =
SparkSession.builder().appName("StreamingIssueCountVectorizer")
.master("local[2]")
.getOrCreate();
List<Row> _trainData = Arrays.asList(
RowFactory.create("sunny fantastic day", 1, "Positive"),
RowFactory.create("fantastic morning match", 1, "Positive"),
RowFactory.create("good morning", 1, "Positive"),
RowFactory.create("boring evening", 5, "Negative"),
RowFactory.create("tragic evening event", 5, "Negative"),
RowFactory.create("today is bad ", 5, "Negative")
);
List<Row> _testData = Arrays.asList(
RowFactory.create("sunny morning", 1),
RowFactory.create("bad evening", 5)
);
StructType schema = new StructType(new StructField[]{
new StructField("tweet", DataTypes.StringType, false,
Metadata.empty()),
new StructField("time", DataTypes.IntegerType, false,
Metadata.empty()),
new StructField("sentiment", DataTypes.StringType, true,
Metadata.empty())
});
StructType testSchema = new StructType(new StructField[]{
new StructField("tweet", DataTypes.StringType, false,
Metadata.empty()),
new StructField("time", DataTypes.IntegerType, false,
Metadata.empty())
});
Dataset<Row> trainData = sparkSession.createDataFrame(_trainData,
schema);
Dataset<Row> testData = sparkSession.createDataFrame(_testData,
testSchema);
StringIndexerModel labelIndexerModel = new StringIndexer()
.setInputCol("sentiment")
.setOutputCol("label")
.setHandleInvalid("skip")
.fit(trainData);
Tokenizer tokenizer = new Tokenizer()
.setInputCol("tweet")
.setOutputCol("words");
CountVectorizer countVectorizer = new CountVectorizer()
.setInputCol(tokenizer.getOutputCol())
.setOutputCol("wordfeatures")
.setVocabSize(3)
.setMinDF(2)
.setMinTF(2)
.setBinary(true);
VectorAssembler vectorAssembler = new VectorAssembler()
.setInputCols(new String[]{"wordfeatures", "time"}).
setOutputCol("features");
Dataset<Row> words = tokenizer.transform(trainData);
CountVectorizerModel countVectorizerModel = countVectorizer.fit(words);
LogisticRegression lr = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.001);
IndexToString labelConverter = new IndexToString()
.setInputCol("prediction")
.setOutputCol("predicted")
.setLabels(labelIndexerModel.labels());
countVectorizerModel.setMinTF(1);
Pipeline pipeline = new Pipeline()
.setStages(
new PipelineStage[]{labelIndexerModel, tokenizer,
countVectorizerModel, vectorAssembler,
lr, labelConverter});
ParamMap[] paramGrid = new ParamGridBuilder()
.addGrid(lr.regParam(), new double[]{0.1, 0.01})
.addGrid(lr.fitIntercept())
.addGrid(lr.elasticNetParam(), new double[]{0.0, 0.5, 1.0})
.build();
MulticlassClassificationEvaluator evaluator = new
MulticlassClassificationEvaluator();
evaluator.setLabelCol("label");
evaluator.setPredictionCol("prediction");
TrainValidationSplit trainValidationSplit = new TrainValidationSplit()
.setEstimator(pipeline)
.setEvaluator(evaluator)
.setEstimatorParamMaps(paramGrid)
.setTrainRatio(0.7);
// Fit the pipeline to training documents.
TrainValidationSplitModel trainValidationSplitModel =
trainValidationSplit.fit(trainData);
trainValidationSplitModel.write().overwrite().save("/tmp/CountSplit.model");
TrainValidationSplitModel _loadedModel = TrainValidationSplitModel
.load("/tmp/CountSplit.model");
PipelineModel loadedModel = (PipelineModel) (_loadedModel).bestModel();
//Test on non-streaming data
Dataset<Row> predicted = loadedModel.transform(testData);
predicted.show();
List<Row> _rows = predicted.select("tweet",
"predicted").collectAsList();
for (Row r : _rows) {
System.out.println("[" + r.get(0) + "], prediction=" + r.get(1));
}
//Test on streaming data
Dataset<Row> lines = sparkSession.readStream().option("sep", ",")
.schema(testSchema).option("header", "true").option("inferSchema",
"true")
.format("com.databricks.spark.csv")
.load("file:///home/davis/Documents/Bugs/StreamingTwitter1");
StreamingQuery query = loadedModel.transform(lines).writeStream()
.outputMode("append")
.format("console")
.start();
query.awaitTermination();
}
}
*##Test data csv file*
tweet,time
Today is a bright sunny day,2
How is everyone feeling in office?,2
I want beef cake. Where is it?,2
The weather sucks today,2
I like Vat69.,5
I don't care,5
Wassup,5
Skyfall sucks!,5
*Output*
*--------*
+-------------+----+----------------+-------------+-----------------+--------------------+--------------------+----------+---------+
| tweet|time| words| wordfeatures| features|
rawPrediction| probability|prediction|predicted|
+-------------+----+----------------+-------------+-----------------+--------------------+--------------------+----------+---------+
|sunny morning| 1|[sunny,
morning]|(3,[1],[1.0])|[0.0,1.0,0.0,1.0]|[3.33130861165765...|[0.96548740116159...|
0.0| Positive|
| bad evening| 5| [bad,
evening]|(3,[0],[1.0])|[1.0,0.0,0.0,5.0]|[-4.4513631975340...|[0.01152820807912...|
1.0| Negative|
+-------------+----+----------------+-------------+-----------------+--------------------+--------------------+----------+---------+
[sunny morning], prediction=Positive
[bad evening], prediction=Negative
Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries
with streaming sources must be executed with writeStream.start();;
FileSource[file:///home/davis/Documents/Bugs/StreamingTwitter1]
at
org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
at
org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36)
at
org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:34)
at
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:381)
at
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:381)
at
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:381)
at
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:381)
at
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at
org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:34)
at
org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:63)
at
org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:74)
at
org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:72)
at
org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:78)
at
org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:78)
at
org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:84)
at
org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:80)
at
org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:89)
at
org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:89)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2832)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2153)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2160)
at org.apache.spark.sql.Dataset.first(Dataset.scala:2167)
at
org.apache.spark.ml.feature.VectorAssembler.first$lzycompute$1(VectorAssembler.scala:57)
at
org.apache.spark.ml.feature.VectorAssembler.org$apache$spark$ml$feature$VectorAssembler$$first$1(VectorAssembler.scala:57)
at
org.apache.spark.ml.feature.VectorAssembler$$anonfun$2$$anonfun$1.apply$mcI$sp(VectorAssembler.scala:88)
at
org.apache.spark.ml.feature.VectorAssembler$$anonfun$2$$anonfun$1.apply(VectorAssembler.scala:88)
at
org.apache.spark.ml.feature.VectorAssembler$$anonfun$2$$anonfun$1.apply(VectorAssembler.scala:88)
at scala.Option.getOrElse(Option.scala:121)
at
org.apache.spark.ml.feature.VectorAssembler$$anonfun$2.apply(VectorAssembler.scala:88)
at
org.apache.spark.ml.feature.VectorAssembler$$anonfun$2.apply(VectorAssembler.scala:58)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:186)
at
org.apache.spark.ml.feature.VectorAssembler.transform(VectorAssembler.scala:58)
at
org.apache.spark.ml.PipelineModel$$anonfun$transform$1.apply(Pipeline.scala:305)
at
org.apache.spark.ml.PipelineModel$$anonfun$transform$1.apply(Pipeline.scala:305)
at
scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
at
scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:186)
at org.apache.spark.ml.PipelineModel.transform(Pipeline.scala:305)
at
StreamingIssueCountVectorizerSplit.main(StreamingIssueCountVectorizerSplit.java:164)
--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
---------------------------------------------------------------------
To unsubscribe e-mail: [email protected]