دلایل استفاده از Spark

دلایل استفاده از Spark

Hadoop MapReduce is slow due to replication, serialization, and disk IO. It is inefficient for interactive algorithms such as Machine learning, Graphs, and Network analysis. Also, it’s not ideal for interactive data mining such as R, Adhoc Reporting, and searching.

Because of In-memory processing, Spark’s computations are very fast. Developers can write iterative algorithms without writing out a result set after each pass through the data. It also provides an integrated framework for advanced analytics like Graph processing, Streaming processing, Machine learning, etc. This can simplify integration.

SparkSurvey

Picture source: 2015 Typesafe survey.

 

Spark extends the popular MapReduce model to efficiently support more types of computations, including interactive queries and stream processing. Speed is important in processing large datasets, as it means the difference between exploring data interactively and waiting minutes or hours. One of the main features Spark offers for speed is the ability to run computations in memory, but the system is also more efficient than MapReduce for complex applications running on disk.

Still, it’s comparatively newer than Hadoop and has only few users.

According to https://spark.apache.org/, Apache Spark has the following advantages:

  • Speed
    Run programs up to 100x faster than Hadoop MapReduce in memory, or 10x faster on disk.
    Spark has an advanced directed acyclic graph (DAG) execution engine that supports cyclic data flow and in-memory computing.
  • Ease of Use
    Write applications quickly in Java, Scala or Python.
    Spark offers over 80 high-level operators that make it easy to build parallel apps. And we can use it interactively from the Scala and Python shells.
  • Generality
    Combine SQL, streaming, and complex analytics.
    Spark powers a stack of high-level tools including Spark SQL, MLlib for machine learning, GraphX, and Spark Streaming. You can combine these libraries seamlessly in the same application.
  • Runs Everywhere
    Spark runs on Hadoop, Mesos, standalone, or in the cloud. It can access diverse data sources including HDFS, Cassandra, HBase, S3.

bogotobogo.com site search:

 

Our example app is another version of WordCount, the classic MapReduce example. In WordCount, the goal is to learn the distribution of letters in the most popular words in our corpus. In other words, we may want to:

  1. Read an input set of text documents.
  2. Count the number of times each word appears.
  3. Filter out all words that show up less than a million times.
  4. For the remaining set, count the number of times each letter occurs.

In MapReduce, this would require two MapReduce jobs, as well as persisting the intermediate data to HDFS in between them. However, in Spark, we can write a single job in about 90 percent fewer lines of code.

 

Note

In this chapter, we’ll run Spark 1.2 with CDH 5.3 via Maven. The reference used are:

 

spark-env.sh

Here is the /etc/spark/con/spark-env.sh that comes with QuickStartVM:

#!/usr/bin/env bash
##
# Generated by Cloudera Manager and should not be modified directly
##

export SPARK_HOME=/usr/lib/spark
export STANDALONE_SPARK_MASTER_HOST=quickstart.cloudera
export SPARK_MASTER_PORT=7077
export DEFAULT_HADOOP_HOME=/usr/lib/hadoop

### Path of Spark assembly jar in HDFS
export SPARK_JAR_HDFS_PATH=${SPARK_JAR_HDFS_PATH:-/user/spark/share/lib/spark-assembly.jar}

### Let's run everything with JVM runtime, instead of Scala
export SPARK_LAUNCH_WITH_SCALA=0
export SPARK_LIBRARY_PATH=${SPARK_HOME}/lib
export SCALA_LIBRARY_PATH=${SPARK_HOME}/lib
export SPARK_MASTER_IP=$STANDALONE_SPARK_MASTER_HOST

export HADOOP_HOME=${HADOOP_HOME:-$DEFAULT_HADOOP_HOME}

if [ -n "$HADOOP_HOME" ]; then
  export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:${HADOOP_HOME}/lib/native
  export SPARK_LIBRARY_PATH=$SPARK_LIBRARY_PATH:${HADOOP_HOME}/lib/native
fi

export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-/etc/hadoop/conf}

 

Spark in QuickStart VM

SparkQuickStartVM

[cloudera@quickstart ~]$ hadoop fs -ls /user
Found 9 items
drwxr-xr-x   - cloudera cloudera            0 2015-03-30 15:35 /user/cloudera
drwxr-xr-x   - flume    flume               0 2015-04-01 19:25 /user/flume
drwxr-xr-x   - hdfs     supergroup          0 2015-04-05 11:11 /user/hdfs
drwxr-xr-x   - mapred   hadoop              0 2015-03-15 14:08 /user/history
drwxrwxrwx   - hive     hive                0 2014-12-18 04:33 /user/hive
drwxrwxr-x   - hue      hue                 0 2015-03-21 15:34 /user/hue
drwxrwxrwx   - oozie    oozie               0 2014-12-18 04:34 /user/oozie
drwxr-xr-x   - sample   sample              0 2015-03-14 22:05 /user/sample
drwxr-xr-x   - spark    spark               0 2014-12-18 04:34 /user/spark

 

Using Spark’s Java API
package org.apache.spark.examples;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;

import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.SparkConf;
import scala.Tuple2;

public class JavaWordCount {
  public static void main(String[] args) {
    JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("Spark Count"));
    final int threshold = Integer.parseInt(args[1]);
    
    // split each document into words
    JavaRDD<String> tokenized = sc.textFile(args[0]).flatMap(
      new FlatMapFunction<String, String>() {
        @Override
        public Iterable<String> call(String s) {
          return Arrays.asList(s.split(" "));
        }
      }
    );
    
    // count the occurrence of each word
    JavaPairRDD<String, Integer> counts = tokenized.mapToPair(
      new PairFunction<String, String, Integer>() {
        @Override
        public Tuple2<String, Integer> call(String s) {
          return new Tuple2<String, Integer>(s, 1);
        }
      }
    ).reduceByKey(
      new Function2<Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer i1, Integer i2) {
          return i1 + i2;
        }
      }
    );
    
    // filter out words with less than threshold occurrences
    JavaPairRDD<String, Integer> filtered = counts.filter(
      new Function<Tuple2<String, Integer>, Boolean>() {
        @Override
        public Boolean call(Tuple2<String, Integer> tup) {
          return tup._2() >= threshold;
        }
      }
    );
    
    // count characters
    JavaPairRDD<Character, Integer> charCounts = filtered.flatMap(
      new FlatMapFunction<Tuple2<String, Integer>, Character>() {
        @Override
        public Iterable<Character> call(Tuple2<String, Integer> s) {
          Collection<Character> chars = new ArrayList<Character>(s._1().length());
          for (char c : s._1().toCharArray()) {
            chars.add(c);
          }
          return chars;
        }
      }
    ).mapToPair(
      new PairFunction<Character, Character, Integer>() {
        @Override
        public Tuple2<Character, Integer> call(Character c) {
          return new Tuple2<Character, Integer>(c, 1);
        }
      }
    ).reduceByKey(
      new Function2<Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer i1, Integer i2) {
          return i1 + i2;
        }
      }
    );
    
    System.out.println(charCounts.collect());
  }
}

 

pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 
  <modelVersion>4.0.0</modelVersion>
  
  <groupId>spark.examples</groupId>
  <artifactId>javawordcount</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <name>javawordcount</name>
  
  <url>http://maven.apache.org</url>
  
  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.10</artifactId>
      <version>1.0.0</version>
    </dependency>
    <dependency>
    <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>2.4.0</version>
    </dependency>
  </dependencies> 
  
</project>

 

jar file

tree

To generate our app jar, we simply run:

[cloudera@quickstart JavaWordCount]$ pwd
/home/cloudera/workspace/JavaWordCount
[cloudera@quickstart JavaWordCount]$ mvn package

tree2

Run

Here is the file structure provided by CDH5 QuickStartVM:
FileBrowser.png

The input directory:
ClouderaInputDir.png

Before running, place the input file into a directory on HDFS. The repository supplies an example input file in its data directory. To run the Spark program, we use the spark-submit script:

[cloudera@quickstart JavaWordCount]$ spark-submit --class com.bogotobogo.spark.JavaWordCount --master local target/javawordcount-0.0.1-SNAPSHOT.jar input/wordcount.txt 2

The output from the run looks like this:

15/05/01 08:30:28 WARN Utils: Your hostname, quickstart.cloudera resolves to a loopback address: 127.0.0.1; using 10.0.2.15 instead (on interface eth0)
15/05/01 08:30:28 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
15/05/01 08:30:28 INFO SecurityManager: Changing view acls to: cloudera
15/05/01 08:30:28 INFO SecurityManager: Changing modify acls to: cloudera
15/05/01 08:30:28 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(cloudera); users with modify permissions: Set(cloudera)
15/05/01 08:30:29 INFO Slf4jLogger: Slf4jLogger started
15/05/01 08:30:29 INFO Remoting: Starting remoting
15/05/01 08:30:29 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@10.0.2.15:48450]
15/05/01 08:30:29 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkDriver@10.0.2.15:48450]
15/05/01 08:30:29 INFO Utils: Successfully started service 'sparkDriver' on port 48450.
15/05/01 08:30:29 INFO SparkEnv: Registering MapOutputTracker
15/05/01 08:30:29 INFO SparkEnv: Registering BlockManagerMaster
15/05/01 08:30:29 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20150501083029-cecf
15/05/01 08:30:29 INFO MemoryStore: MemoryStore started with capacity 267.3 MB
15/05/01 08:30:30 INFO HttpFileServer: HTTP File server directory is /tmp/spark-a5c025c2-9b7f-48a4-914f-cac6f411f231
15/05/01 08:30:30 INFO HttpServer: Starting HTTP Server
15/05/01 08:30:30 INFO Utils: Successfully started service 'HTTP file server' on port 45085.
15/05/01 08:30:30 INFO Utils: Successfully started service 'SparkUI' on port 4040.
15/05/01 08:30:30 INFO SparkUI: Started SparkUI at http://10.0.2.15:4040
15/05/01 08:30:30 INFO SparkContext: Added JAR file:/home/cloudera/workspace/JavaWordCount/target/javawordcount-0.0.1-SNAPSHOT.jar at http://10.0.2.15:45085/jars/javawordcount-0.0.1-SNAPSHOT.jar with timestamp 1430494230600
15/05/01 08:30:30 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@10.0.2.15:48450/user/HeartbeatReceiver
15/05/01 08:30:30 INFO NettyBlockTransferService: Server created on 37295
15/05/01 08:30:30 INFO BlockManagerMaster: Trying to register BlockManager
15/05/01 08:30:30 INFO BlockManagerMasterActor: Registering block manager localhost:37295 with 267.3 MB RAM, BlockManagerId(, localhost, 37295)
15/05/01 08:30:30 INFO BlockManagerMaster: Registered BlockManager
15/05/01 08:30:32 INFO EventLoggingListener: Logging events to hdfs://quickstart.cloudera:8020/user/spark/applicationHistory/local-1430494230688
15/05/01 08:30:32 INFO MemoryStore: ensureFreeSpace(259846) called with curMem=0, maxMem=280248975
15/05/01 08:30:32 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 253.8 KB, free 267.0 MB)
15/05/01 08:30:32 INFO MemoryStore: ensureFreeSpace(21134) called with curMem=259846, maxMem=280248975
15/05/01 08:30:32 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 20.6 KB, free 267.0 MB)
15/05/01 08:30:32 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:37295 (size: 20.6 KB, free: 267.2 MB)
15/05/01 08:30:32 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0
15/05/01 08:30:32 INFO SparkContext: Created broadcast 0 from textFile at JavaWordCount.java:19
15/05/01 08:30:32 INFO FileInputFormat: Total input paths to process : 1
15/05/01 08:30:33 INFO SparkContext: Starting job: collect at JavaWordCount.java:83
15/05/01 08:30:33 INFO DAGScheduler: Registering RDD 3 (mapToPair at JavaWordCount.java:29)
15/05/01 08:30:33 INFO DAGScheduler: Registering RDD 7 (mapToPair at JavaWordCount.java:56)
15/05/01 08:30:33 INFO DAGScheduler: Got job 0 (collect at JavaWordCount.java:83) with 1 output partitions (allowLocal=false)
15/05/01 08:30:33 INFO DAGScheduler: Final stage: Stage 2(collect at JavaWordCount.java:83)
15/05/01 08:30:33 INFO DAGScheduler: Parents of final stage: List(Stage 1)
15/05/01 08:30:33 INFO DAGScheduler: Missing parents: List(Stage 1)
15/05/01 08:30:33 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[3] at mapToPair at JavaWordCount.java:29), which has no missing parents
15/05/01 08:30:33 INFO MemoryStore: ensureFreeSpace(4240) called with curMem=280980, maxMem=280248975
15/05/01 08:30:33 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 4.1 KB, free 267.0 MB)
15/05/01 08:30:33 INFO MemoryStore: ensureFreeSpace(2480) called with curMem=285220, maxMem=280248975
15/05/01 08:30:33 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.4 KB, free 267.0 MB)
15/05/01 08:30:33 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:37295 (size: 2.4 KB, free: 267.2 MB)
15/05/01 08:30:33 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0
15/05/01 08:30:33 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:838
15/05/01 08:30:33 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0 (MappedRDD[3] at mapToPair at JavaWordCount.java:29)
15/05/01 08:30:33 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
15/05/01 08:30:33 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, ANY, 1388 bytes)
15/05/01 08:30:33 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
15/05/01 08:30:33 INFO Executor: Fetching http://10.0.2.15:45085/jars/javawordcount-0.0.1-SNAPSHOT.jar with timestamp 1430494230600
15/05/01 08:30:33 INFO Utils: Fetching http://10.0.2.15:45085/jars/javawordcount-0.0.1-SNAPSHOT.jar to /tmp/fetchFileTemp6146690575433412492.tmp
15/05/01 08:30:33 INFO Executor: Adding file:/tmp/spark-1972edea-edbb-4ccb-92d1-5ddb376207db/javawordcount-0.0.1-SNAPSHOT.jar to class loader
15/05/01 08:30:33 INFO HadoopRDD: Input split: hdfs://quickstart.cloudera:8020/user/cloudera/input/wordcount.txt:0+1539
15/05/01 08:30:33 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
15/05/01 08:30:33 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
15/05/01 08:30:33 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
15/05/01 08:30:33 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
15/05/01 08:30:33 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
15/05/01 08:30:33 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1895 bytes result sent to driver
15/05/01 08:30:33 INFO DAGScheduler: Stage 0 (mapToPair at JavaWordCount.java:29) finished in 0.570 s
15/05/01 08:30:33 INFO DAGScheduler: looking for newly runnable stages
15/05/01 08:30:33 INFO DAGScheduler: running: Set()
15/05/01 08:30:33 INFO DAGScheduler: waiting: Set(Stage 1, Stage 2)
15/05/01 08:30:33 INFO DAGScheduler: failed: Set()
15/05/01 08:30:33 INFO DAGScheduler: Missing parents for Stage 1: List()
15/05/01 08:30:33 INFO DAGScheduler: Missing parents for Stage 2: List(Stage 1)
15/05/01 08:30:33 INFO DAGScheduler: Submitting Stage 1 (MappedRDD[7] at mapToPair at JavaWordCount.java:56), which is now runnable
15/05/01 08:30:33 INFO MemoryStore: ensureFreeSpace(3680) called with curMem=287700, maxMem=280248975
15/05/01 08:30:33 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 3.6 KB, free 267.0 MB)
15/05/01 08:30:33 INFO MemoryStore: ensureFreeSpace(2037) called with curMem=291380, maxMem=280248975
15/05/01 08:30:33 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 2037.0 B, free 267.0 MB)
15/05/01 08:30:33 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:37295 (size: 2037.0 B, free: 267.2 MB)
15/05/01 08:30:33 INFO BlockManagerMaster: Updated info of block broadcast_2_piece0
15/05/01 08:30:33 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 547 ms on localhost (1/1)
15/05/01 08:30:33 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:838
15/05/01 08:30:33 INFO DAGScheduler: Submitting 1 missing tasks from Stage 1 (MappedRDD[7] at mapToPair at JavaWordCount.java:56)
15/05/01 08:30:33 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
15/05/01 08:30:33 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
15/05/01 08:30:33 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, PROCESS_LOCAL, 1115 bytes)
15/05/01 08:30:33 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
15/05/01 08:30:33 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks
15/05/01 08:30:33 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 10 ms
15/05/01 08:30:34 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1000 bytes result sent to driver
15/05/01 08:30:34 INFO DAGScheduler: Stage 1 (mapToPair at JavaWordCount.java:56) finished in 0.155 s
15/05/01 08:30:34 INFO DAGScheduler: looking for newly runnable stages
15/05/01 08:30:34 INFO DAGScheduler: running: Set()
15/05/01 08:30:34 INFO DAGScheduler: waiting: Set(Stage 2)
15/05/01 08:30:34 INFO DAGScheduler: failed: Set()
15/05/01 08:30:34 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 154 ms on localhost (1/1)
15/05/01 08:30:34 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
15/05/01 08:30:34 INFO DAGScheduler: Missing parents for Stage 2: List()
15/05/01 08:30:34 INFO DAGScheduler: Submitting Stage 2 (ShuffledRDD[8] at reduceByKey at JavaWordCount.java:56), which is now runnable
15/05/01 08:30:34 INFO MemoryStore: ensureFreeSpace(2240) called with curMem=293417, maxMem=280248975
15/05/01 08:30:34 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 2.2 KB, free 267.0 MB)
15/05/01 08:30:34 INFO MemoryStore: ensureFreeSpace(1412) called with curMem=295657, maxMem=280248975
15/05/01 08:30:34 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 1412.0 B, free 267.0 MB)
15/05/01 08:30:34 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on localhost:37295 (size: 1412.0 B, free: 267.2 MB)
15/05/01 08:30:34 INFO BlockManagerMaster: Updated info of block broadcast_3_piece0
15/05/01 08:30:34 INFO SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:838
15/05/01 08:30:34 INFO DAGScheduler: Submitting 1 missing tasks from Stage 2 (ShuffledRDD[8] at reduceByKey at JavaWordCount.java:56)
15/05/01 08:30:34 INFO TaskSchedulerImpl: Adding task set 2.0 with 1 tasks
15/05/01 08:30:34 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 2, localhost, PROCESS_LOCAL, 1126 bytes)
15/05/01 08:30:34 INFO Executor: Running task 0.0 in stage 2.0 (TID 2)
15/05/01 08:30:34 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks
15/05/01 08:30:34 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
15/05/01 08:30:34 INFO Executor: Finished task 0.0 in stage 2.0 (TID 2). 1679 bytes result sent to driver
15/05/01 08:30:34 INFO DAGScheduler: Stage 2 (collect at JavaWordCount.java:83) finished in 0.033 s
15/05/01 08:30:34 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 2) in 12 ms on localhost (1/1)
15/05/01 08:30:34 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 
15/05/01 08:30:34 INFO DAGScheduler: Job 0 finished: collect at JavaWordCount.java:83, took 1.141125 s
[(T,1), (w,5), (s,8), (d,9), (e,25), (p,1), (L,1), (B,1), (a,19), (t,11), (i,9), (;,1), (y,4), (u,6), (.,2), (h,10), (k,1), (b,3), (A,2), (O,1), (n,16), (o,10), (f,5), (I,2), (-,2), (v,5), (H,1), (,,2), (l,10), (g,3), (r,8), (m,8), (c,2)]
[cloudera@quickstart JavaWordCount]

 

دیدگاهتان را بنویسید

نشانی ایمیل شما منتشر نخواهد شد. بخش‌های موردنیاز علامت‌گذاری شده‌اند *