Spark案例:Scala语言编程单词计数

SPARK 2017-01-14

实例描述

  通过scala编写spark的单词计数程序。

测试数据

1.txt

hello world
hello hadoop
hadoop hive

pom.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn.mn1024.spark</groupId>
    <artifactId>spark_demo_wordcount</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <scala.version>2.11.8</scala.version>
        <scala.compat.version>2.11</scala.compat.version>
        <hadoop.version>2.7.4</hadoop.version>
        <spark.version>2.0.2</spark.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-flume_2.10</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka_2.11</artifactId>
            <version>1.6.2</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.38</version>
        </dependency>
    </dependencies>
    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.18.1</version>
                <configuration>
                    <useFile>false</useFile>
                    <disableXmlReport>true</disableXmlReport>
                    <includes>
                        <include>**/*Test.*</include>
                        <include>**/*Suite.*</include>
                    </includes>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>cn.mn1024.spark.WordCount</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

程序代码

package cn.mn1024.spark

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
  * 功能:通过scala编写spark的单词计数程序
  * @author GuangLing_Lin
  */
object WordCount {

  def main(args: Array[String]): Unit = {
    // 创建SparkConf对象,设置appName和master地址,local[2]表示本地使用2个线程来进行计算
    val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[2]")

    // 创建SparkContext对象,这个对象很重要,它会创建DAGScheduler和TaskScheduler,它是程序的入口,所有计算的资源
    val sc: SparkContext = new SparkContext(sparkConf)

    //设置日志输出级别
    sc.setLogLevel("WARN")

    // 读取数据文件
    val file: RDD[String] = sc.textFile("F://wordcount//input//1.txt")

    // 对文件中的每一行单词进行压平切分
    val words: RDD[String] = file.flatMap(_.split(" "))

    // 对每一个单词计数为1,转化为(单词,1)
    val wordAndOne: RDD[(String, Int)] = words.map(x => (x, 1))

    // 相同的单词进行汇总,前一个下划线表示累加数据,后一个下划线表示新数据
    val result: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)

    // 按照单词出现的次数降序排序
    val sortResult: RDD[(String, Int)] = result.sortBy(_._2, false)

    // 收集数据,打印输出
    val finalResult: Array[(String, Int)] = sortResult.collect()

    // 打印结果
    finalResult.foreach(x => println(x))

    sc.stop()
  }

}

测试结果

(hello,2)
(hadoop,2)
(hive,1)
(world,1)

Debug

  • Error:scalac:bad option:'-make:transitive'
  • 解决方法:scala版本问题,scala2.11不支持make参数,将pom.xml中的这个参数去掉即可解决

    <configuration>
    <args>
       <!--arg>-make:transitive</arg-->
       <arg>-dependencyfile</arg>
       <arg>${project.build.directory}/.scala_dependencies</arg>
    </args>
    </configuration>
  • 解决方法:jar包冲突,将Maven仓库中的Log4j删掉,即可解决。

    Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/log4j/Level
    at org.apache.spark.internal.Logging$class.initializeLogging(Logging.scala:111)
    at org.apache.spark.internal.Logging$class.initializeLogIfNecessary(Logging.scala:102)
    at org.apache.spark.SparkContext.initializeLogIfNecessary(SparkContext.scala:74)
    at org.apache.spark.internal.Logging$class.log(Logging.scala:46)
    at org.apache.spark.SparkContext.log(SparkContext.scala:74)
    at org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
    at org.apache.spark.SparkContext.logInfo(SparkContext.scala:74)
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:185)
    at cn.mn1024.spark.WordCount$.main(WordCount.scala:14)
    at cn.mn1024.spark.WordCount.main(WordCount.scala)
    Caused by: java.lang.ClassNotFoundException: org.apache.log4j.Level
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 10 more

每一个成功的背后都有无数个无人知晓的黑夜。

因为

夜晚,是超越对手的最佳时机。

===================== 码农1024 =====================#蔺光岭#


本文由 蔺光岭 创作,采用 知识共享署名 4.0,可自由转载、引用,但需署名作者且注明文章出处。

还不快抢沙发

添加新评论