首家大数据培训挂牌机构 股票代码:837906 | EN CN
【小牛原创】Spark SQL 从入门到实战 -- spark sql 1.6版本相关api
【小牛原创】Spark SQL 从入门到实战 -- 概述
Spark Streaming:大规模流式数据处理
spark RDD 相关需求
spark RDD 高级应用
Spark手册 - load&save
Spark手册 - debug
Spark手册 - cache&checkpoint
Spark手册 - RDD Action API
Spark手册 - Partitioner源码
Spark手册 - RDD Transformation API
Spark手册 - RDD的依赖关系
Spark手册 - RDD入门
Spark手册 - 远程debug
Spark手册 - 在IDEA中编写WordCount程序(3)
Spark手册 - 在IDEA中编写WordCount程序(2)
Spark手册 - 在IDEA中编写WordCount程序(1)
Spark手册 - 执行Spark程序
Spark手册 - 集群安装
20页PPT|视频类网站大数据生态 Spark在爱奇艺的应用实践
Spark机器学习入门实例——大数据集(30+g)二分类
Spark官方文档中文翻译:Spark SQL 之 Data Sources
使用Spark MLlib来训练并服务于自然语言处理模型
Spark知识体系完整解读
案例 :Spark应用案例现场分享(IBM Datapalooza)
最全的Spark基础知识解答
Spark在GrowingIO数据无埋点全量采集场景下的实践
Apache Spark探秘:三种分布式部署方式比较
Apache Spark探秘:多进程模型还是多线程模型?
Apache Spark探秘:实现Map-side Join和Reduce-side Join
Apache Spark探秘:利用Intellij IDEA构建开发环境
spark on yarn的技术挑战
Apache Spark学习:将Spark部署到Hadoop 2.2.0上
Hadoop与Spark常用配置参数总结
基于Spark Mllib,SparkSQL的电影推荐系统
spark作业调优秘籍,解数据倾斜之痛
Spark入门必学:预测泰坦尼克号上的生还情况
小牛学堂浅谈基于Spark大数据平台日志审计系统的设计与实现
【Hadoop Summit Tokyo 2016】使用基于Lambda架构的Spark的近实时的网络异常检测和流量分析
Spark编程环境搭建经验分享
Spark技术在京东智能供应链预测的应用
spark中textFile、groupByKey、collect、flatMap、map结合小案例
Spark中DataFrame的schema讲解
深度剖析Spark分布式执行原理
【Spark Summit East 2017】从容器化Spark负载中获取的经验
内存分析技术哪家强?Spark占几何
Spark系列之一:Spark,一种快速数据分析替代方案
6种最常见的Hadoop和Spark项目
Hadoop vs Spark
Hadoop与Spark常用配置参数总结
Spark RPC通信层设计原理分析
Spark Standalone架构设计要点分析
Spark UnifiedMemoryManager内存管理模型分析
网易的Spark技术分享

Spark机器学习入门实例——大数据集(30+g)二分类

于2017-04-01由小牛君创建

分享到:


大数据

本篇教程将引领大家,通过使用spark的机器学习性能和 Scala ,练习一个基于超出内存可加载范围的数据集的逻辑回归分类器(即LR分类器)。

假如你想创建一个机器学习模型,但却发现你的输入数据集与你的计算机内存不相符?对于多机器的计算集群环境中通常可以使用如Hadoop和Apache Spark分布式计算工具。然而,Apache Spark能够在本地机器独立模式上,甚至在当输入数据集大于你的计算机内存时通过创建模型处理你的数据。

大数据培训,就上小牛学堂专稿,原文作者:Dmitry Petrov 本文由大数据培训,就上小牛学堂翻译组-元曜+BI+刘亭亭翻译,任何不标明译者和出处以及本文链接http://www.36dsj.com/archives/37699 的均为侵权。

在这篇文章里,通过使用一个34.6千兆字节的输入数据集创建一个二进制分类模型,为您展现一个Apache Spark的端对端脚本。

可以在您的计算机上运行进行测试。

大数据培训,就上小牛学堂

1.输入数据和预期结果

在上一篇文章我们讨论了“How To Find Simple And Interesting Multi-Gigabytes Data Set”,本文将使用上文中提及数据集的Posts.xml文件。文件大小是34.6千兆字节,这个xml文件包含stackoverflow.com文章数据作为xml属性:

  • 标题 – 文章标题
  • 主体 – 文章文本
  • 标签 – 文章的标签列表
  • 10+ 更多的xml -我们不需要使用的属性

关于stackoverflow.com的Posts.xml完整数据集信息请点击:https://archive.org/details/stackexchange.

另外我创建一个较小版本的这种文件,里面只有10个条目或文章。此文件包含一个小尺寸的原始数据集,这个数据是被知识共享许可批准的。

如你所料,这个小文件并不是模型训练的最好的选择(这个小模型训练文件并不是最好的选择),这个文件仅适用于实验数据准备代码。然而,本文中的端对端Spark脚本也适用于这个小文件,文件下载请点击这里。

我们的目标是创建一个可基于主体和标题预测文章标签的预测模型。为了精简任务和减少代码数量,我们将联接标题和主体并作为一个单独的文本列。

可想而知,这个模型在stackoverflow.com网站上是怎样工作的——用户键入一个问题,网站自动给予标签建议。

假设我们需要尽可能多的正确的标签,并且用户将消除不必要的标签。由于这个假设我们将选择撤销作为我们的模型高优先级目标。

2.二进制和多标签分类

栈溢出标记预测问题属于多标签分类的一种但并不唯一,因为模型应当预测许多分类。相同的文本将被归类为“Java”和“多线程”。注意多标签分类是不同的问题的一个泛化 ——多分类问题,从一组类预测为仅仅一个类。为了简化我们的第一个Apache Spark问题以及减少代码数量,让我们开始简化问题吧。取代练习一个多标记分类器,我们来对一个给定的标签练习一个简单的二进制分类器。例如,对标签“Java”,创建一个能够预测关于Java语言文章的分类器。

通过使用这个简单方法,可以创建几乎所有常见标签的分类器(Java, C++, Python, multi-threading等)。这是一个简单而易学的方法。然而,在实践中却并不完美,因为拆分预测模型基于各自的分类器,你忽略了类之间的相关性。另一个原因——训练许多分类器可能需要大量的计算。

3.在独立模式下设置和运行Apache Spark

如果你的机器上没有Apache Spark,从Spark官网上可以很容易下载到: http://spark.apache.org/,请下载使用1.5.1版本。spark-1.5.1版本的下载链接—— http://d3kbcqa49mib13.cloudfront.net/spark-1.5.1-bin-hadoop2.6.tgz

如果Java在你的计算机上已经安装了,那么你可以准备启动Spark了,如果没有的话——请先安装Java。

在Unix系统和Macs系统上,解压缩该文件并复制到任意目录中。现在这个目录就是Spark的目录了。

运行Spark master:

sbin/start-master.sh

运行spark slave:

sbin/start-slaves.sh

运行Spark shell:

bin/spark-shell

Spark shell可以交互模式运行Scala命令。

Windows用户可以在这里找到命令:http://nishutayaltech.blogspot.in/2015/04/how-to-run-apache-spark-on-windows7-in.html

如果你工作在Hadoop环境的集群模式下,我假定你已经知道怎样运行Spark shell了。

4.导入库

对于这个端到端场景我们将使用Scala,也是Apache Spark的主要编程语言。

// General purpose library
import scala.xml._

// Spark data manipulation libraries
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

// Spark machine learning libraries
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.classification
    .LogisticRegression
import org.apache.spark.mllib.evaluation
    .BinaryClassificationMetrics
import org.apache.spark.ml.Pipeline

5.解析XML

我们需要从输入的xml文件中提取主体、文本和标签,并用这些列创建一个单独的data-frame。首先,让我们移除xml的页眉和页脚。我假定输入文件和Spark shell命令位于同一目录下。

val fileName = "Posts.small.xml"
val textFile = sc.textFile(fileName)
val postsXml = textFile.map(_.trim).
   filter(!_.startsWith("<?xml version=")).
   filter(_ != "<posts>").
   filter(_ != "</posts>")

Spark具有良好的函数可用于解析json和csv格式。对于Xml,我们需要编写一些额外的代码行,通过规格化编码模式来创建一个数据框架。

注意,Scala语言自动转换所有xml代码,像“<a>”转换到实际标签“<a>”。我们也将连接标题和主体、移除所有的不必要的标签以及来自主体和所有空间副本的新行字符。

val postsRDD = postsXml.map { s =>
   val xml = XML.loadString(s)

   val id = (xml \ "@Id").text
   val tags = (xml \ "@Tags").text

   val title = (xml \ "@Title").text
   val body = (xml \ "@Body").text
   val bodyPlain = ("<\\S+>".r).replaceAllIn(body, " ")
   val text = (title + " " + bodyPlain).replaceAll("\n", 
      " ").replaceAll("( )+", " ");

   Row(id, tags, text)
}

创建一个data-frame,schema应当应用于RDD(弹性分布式数据集,Resilient Distributed Datasets)

val schemaString = "Id Tags Text"
val schema = StructType(
   schemaString.split(" ").map(fieldName => 
      StructField(fieldName, StringType, true)))

val postsDf = sqlContext.createDataFrame(postsRDD, schema)

现在你可以查看一下你的数据框架了。

6.准备练习和测试数据集

下一步——为一个二进制分类器创建一个二进制标签。对这个代码示例,我们将用“java”作为一个标签,我们想要用一个二进制分类器来预测。所有存在“java”标签的行被标记为“1”,否则被标记为“0”。让我们来辨别我们的目标标记“java”和基于这个标记创建二进制标签。

val targetTag = "java"
val myudf: (String => Double) = (str: String) => 
    {if (str.contains(targetTag)) 1.0 else 0.0}
val sqlfunc = udf(myudf)
val postsLabeled = postsDf.withColumn("Label", 
    sqlfunc(col("Tags")) )

通过使用新标签,数据集可分为消极的子集和积极的子集。

val positive = postsLabeled.filter('Label > 0.0)
val negative = postsLabeled.filter('Label < 1.0)

我们将使用数据的90%用于模型训练和另外10%作为数据集测试。让我们通过独立抽样积极和消极数据集来创建一个训练数据集吧。

val positiveTrain = positive.sample(false, 0.9)
val negativeTrain = negative.sample(false, 0.9)
val training = positiveTrain.unionAll(negativeTrain)

测试数据集应当包括训练数据集中不包括的所有的行。并且再次地——分别地处理积极样例和消极样例

val negativeTrainTmp = negativeTrain
    .withColumnRenamed("Label", "Flag").select('Id, 'Flag)

val negativeTest = negative.join( negativeTrainTmp, 
    negative("Id") === negativeTrainTmp("Id"), 
    "LeftOuter").filter("Flag is null")
    .select(negative("Id"), 'Tags, 'Text, 'Label)

val positiveTrainTmp = positiveTrain
    .withColumnRenamed("Label", "Flag")
    .select('Id, 'Flag)

val positiveTest = positive.join( positiveTrainTmp, 
    positive("Id") === positiveTrainTmp("Id"), 
    "LeftOuter").filter("Flag is null")
    .select(positive("Id"), 'Tags, 'Text, 'Label)

val testing = negativeTest.unionAll(positiveTest)

7.训练模型

定义训练参数:

1.特征数目

2.回归参数

3.梯度下降的样本点数目

Spark API 创建一个基于来自 data-frame和训练参数的列的模型 :

val numFeatures = 64000
val numEpochs = 30
val regParam = 0.02

val tokenizer = new Tokenizer().setInputCol("Text")
    .setOutputCol("Words")

val hashingTF = new  org.apache.spark.ml.feature
    .HashingTF().setNumFeatures(numFeatures).

    setInputCol(tokenizer.getOutputCol)
    .setOutputCol("Features")

val lr = new LogisticRegression().setMaxIter(numEpochs)
    .setRegParam(regParam)setFeaturesCol("Features")
    .setLabelCol("Label").setRawPredictionCol("Score")
    .setPredictionCol("Prediction")

val pipeline = new Pipeline()
    .setStages(Array(tokenizer, hashingTF, lr))

val model = pipeline.fit(training)

8.测试模型

这是我们最后一段二进制“Java”分类器代码,将返回一个预测值(0.0或1.0):

val testTitle = 
 "Easiest way to merge a release into one JAR file"

val testBoby = 
 """Is there a tool or script which easily merges a bunch 
 of href="http://en.wikipedia.org/wiki/JAR_%28file_format
 %29" JAR files into one JAR file? A bonus would be to 
 easily set the main-file manifest and make it executable.
 I would like to run it with something like: As far as I 
 can tell, it has no dependencies which indicates that it 
 shouldn't be an easy single-file tool, but the downloaded
 ZIP file contains a lot of libraries."""

val testText = testTitle + testBody

val testDF = sqlContext
   .createDataFrame(Seq( (99.0, testText)))
   .toDF("Label", "Text")

val result = model.transform(testDF)

val prediction = result.collect()(0)(6)
   .asInstanceOf[Double]

print("Prediction: "+ prediction)

让我们评估基于训练数据集的模型的质量。

val testingResult = model.transform(testing)

val testingResultScores = testingResult
   .select("Prediction", "Label").rdd
   .map(r => (r(0).asInstanceOf[Double], r(1)
   .asInstanceOf[Double]))

val bc = 
   new BinaryClassificationMetrics(testingResultScores)

val roc = bc.areaUnderROC

print("Area under the ROC:" + roc)

如果你使用的是小规模的数据集,那么你的模型质量可能不是最好的。ROC值下的面积将非常低(接近50%),意味着一个差的模型质量。如果使用的是整个的Post.xml数据集,质量就不会这么差,ROC下的面积是0.64。或许你可以通过考虑不同的转换,如TF-IDF和规范化来改善结果,但本文中不再详述。

结语

如果你的数据集超出计算内存承受范围的话,Apache Spark可能是数据处理和机器学习脚本的一个非常好的选择。在一个Hadoop Yarn集群模式环境中使用Spark或许不那么容易,但是,在本地或独立模式下,Spark跟其他分析工具一样简单。

Dmitry Petrov

By Dmitry Petrov, Microsoft and FullStackML.

个人简历: Dmitry Petrov,Ph.D.微软数据科学家。

原文:Beginners Guide: Apache Spark Machine Learning with Large Data

End.