GIT – Summary:  This article—the first in a series on Hadoop—explores the Hadoop framework, including its fundamental elements, such as the Hadoop file system (HDFS), and node types that are commonly used. Learn how to and configure a single-node Hadoop cluster, and delve into the MapReduce application. Finally, discover ways to and manage Hadoop using its core Web interfaces.

Vietnamese documents – Original : http://www.gocit.vn/bai-viet/xu-ly-du-lieu-phan-tan-bang-hadoop/

Part 1: Getting started

Although Hadoop is the core of data reduction for some of the largest search engines, it’s better described as a framework for the distributed processing of data. And not just data, but massive amounts of data, as would be required for search engines and the crawled data they collect. As a distributed framework, Hadoop enables many applications that benefit from parallelization of data processing.

This article is not meant to introduce you to Hadoop and its architecture but rather to demonstrate a simple Hadoop setup. In the Resources section, you can find more details on Hadoop architecture, components, and theory of operation. With that disclaimer in place, let’s dive right into Hadoop installation and configuration.

Initial setup

The origins of Hadoop

The Apache Hadoop project was inspired and developed from earlier work by . Although holds a patent for this method of large-scale data processing, the company generously granted a license for Hadoop. See the Resources section for details.

For this demonstration, we”ll use the Cloudera Hadoop distribution. You’ll find support for a variety of different ® distributions there, so it’s ideal for getting started.

This article assumes first that your system has Java™ technology (minimum release 1.6) and cURL installed. If not, you need to add those first (see the Resources section for more information on this installation).

Because I’m running on (the Intrepid release), I use the aptutility to grab the Hadoop distribution. This process is quite simple and allows me to grab the binary package without the additional details of downloading and building the source. First, I tell aptabout the Cloudera site. I then create a new file in /etc/apt/sources.list.d/cloudera.list and add the following text:

deb http://archive.cloudera.com/debian intrepid-cdh3 contrib
deb-src http://archive.cloudera.com/debian intrepid-cdh3 contrib

If you’re running Jaunty or another release, just replace intrepid with your specific release name (current support includes Hardy, Intrepid, Jaunty, Karmic, and Lenny).

Next, I grab the apt-key from Cloudera to validate the downloaded package:

$ curl -s http://archive.cloudera.com/debian/archive.key | \
sudo apt-key add - sudo apt-get update

And then install Hadoop for a pseudo-distributed configuration (all of the Hadoop daemons run on a single host):

$ sudo apt-get install hadoop-0.20-conf-pseudo

Note that this configuration is around 23MB (not including any other packages apt pulls in that may not be present). This installation is ideal for playing with Hadoop and learning about its elements and interfaces.

Finally, I set up passphrase-less SSH. If you try to use ssh localhost and a passphrase is requested, you’ll need to perform the following steps. I assume that this is a dedicated Hadoop box, as this step does have some implications (see Listing 1).

Listing 1. Setting up for passphrase-less SSH

$ sudo su -
# ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa
# cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys

One final note is that you need to ensure that your host has sufficient storage available for the datanode (the cache). Insufficient storage manifests itself in strange ways (such as errors indicating the inability to replicate to a node).

 Starting Hadoop

Now, you’re ready to start Hadoop, which you effectively do by starting each of the Hadoop daemons. But first, format your Hadoop File System (HDFS) using the hadoop . The hadoop command has a number of uses, some of which we’ll explore shortly.

First, request the namenode to format the DFS file system. You do this as part of the installation, but it’s useful to know if you ever need to generate a clean file system.

# hadoop-0.20 namenode -format

After acknowledging the request, the file system will be formated and some information returned. Next, start the Hadoop daemons. Hadoop starts five daemons in this pseudo-distributed configuration: namenode, secondarynamenode, datanode, jobtracker, and tasktracker. When each of the daemons has been started, you’ll see a small amount of text emitted for each (identifying where its logs are stored). Each daemon is being started to run in the background (as a daemon). Figure 1 illustrates what the pseudo-distributed node looks like once the startup is complete.

Figure 1. Pseudo-distributed Hadoop configuration 
Block diagram of pseudo-distributed Hadoop configuration

Hadoop provides some helper tools to simplify its startup. These tools are categorized as start (such as start-dfs) and stop (such as stop-dfs). The following short script illustrates how to start the Hadoop node:

# /usr/lib/hadoop-0.20/bin/start-dfs.sh
# /usr/lib/hadoop-0.20/bin/start-mapred.sh

To verify that the daemons are running, you can use the jps command (which is a ps utility for JVM processes). This command lists the five daemons and their process identifiers.

Now that the Hadoop daemons are running, let’s look back at them to introduce what each accomplishes in the Hadoop framework. The namenode is the master server in Hadoop and manages the file system namespace and access to the files stored in the cluster. There’s also a secondary namenode, which isn’t a redundant daemon for the namenode but instead provides period checkpointing and housekeeping tasks. You’ll find one namenode and one secondary namenode in a Hadoop cluster.

The datanode manages the storage attached to a node, of which there can be multiple nodes in a cluster. Each node storing data will have a datanode daemon running.

Finally, each cluster will have a single jobtracker that is responsible for scheduling work at the datanodes and a tasktracker per datanode that performs the actual work. The jobtracker and tasktracker behave in a master-slave arrangement, where the jobtracker distributes work across the datanodes and the tasktracker performs the task. The jobtracker also validates requested work and, if a datanode fails for some reason, reschedules the previous task.

In this simple configuration, all nodes simply reside on the same node (see Figure 1). But from the previous discussion, it’s easy to see how Hadoop provides parallel processing of work. Although the architecture is simple, Hadoop provides an easy way to distribute data, load balance, and parallel process large amounts of data in a fault-tolerant way.

 Inspecting HDFS

You can perform a couple of tests to ensure that Hadoop is up and running normally (at least the namenode). Knowing that all of your processes are available, you can use the hadoop command to inspect the local namespace (see Listing 2).

Listing 2. Checking access to the HDFS

# hadoop-0.20 fs -ls /
Found 2 items
drwxr-xr-x   -  supergroup          0 2010-04-29 16:38 /user
drwxr-xr-x   -  supergroup          0 2010-04-29 16:28 /var

From this, you can see that the namenode is up and able to service the local namespace. Notice that you’re using a command called hadoop-0.20 to inspect the file system. This utility is how you interact with the Hadoop cluster, from inspecting the file system to running jobs in the cluster. Note the command structure here: After specifying the hadoop-0.20 utility, you define a command (in this case, the generic file system shell) and one or more options (in this case, you request a file list using ls). As the hadoop-0.20 is one of your primary interfaces to the Hadoop cluster, you’ll see this utility used quite a bit through this article. Listing 3 provides some additional file system operations with which you can explore this interface a bit further (creating a new subdirectory called test, listing its contents, and then removing it).

Listing 3. Exploring file system manipulation in Hadoop

# hadoop-0.20 fs -mkdir test
# hadoop-0.20 fs -ls test
# hadoop-0.20 fs -rmr test
Deleted hdfs://localhost/user/root/test

Testing Hadoop

Now that you have installed Hadoop and tested the basic interface to its file system, it’s time to test Hadoop in a real application. In this example, you see the MapReduce process on a small set of data. Map and reduce are named after functions in functional programming but provide the core capability for data reduction. Map refers to the process of chopping input into a smaller set of sub-problems for processing (where these sub-problems are distributed to parallel workers). Reduce refers to the assembly of answers from the sub-problems into a single set of output. Note that I haven’t defined what processing means here, as the framework permits you to define this yourself. Canonical MapReduce is the calculation of word frequency in a set of documents.

Per the previous discussion, you’ll have a set of inputs and a resulting set of outputs. The first step is to create an input subdirectory in the file system into which you’ll drop your work. You do this using:

# hadoop-0.20 fs -mkdir input

Next, drop some work in the input subdirectory. In this case, use the put command, which moves a file from the local file system into the HDFS (see Listing 4). Note the format below, which moves the source file to the HDFS subdirectory (input). Once done, you’ll have two text files in HDFS ready to be processed.

Listing 4. Moving files into HDFS

# hadoop-0.20 fs -put /usr/src/linux-source-2.6.27/Doc*/memory-barriers.txt  input
# hadoop-0.20 fs -put /usr/src/linux-source-2.6.27/Doc*/rt-mutex-design.txt  input

Next, you can check for the presence of the files using the ls command (see Listing 5)

Listing 5. Checking files in HDFS

# hadoop-0.20 fs -ls input
Found 2 items
-rw-r--r--  1 root supergroup 78031 2010-04-29 17:35 /user/root/input/memory-barriers.txt
-rw-r--r--  1 root supergroup 33567 2010-04-29 17:36 /user/root/input/rt-mutex-design.txt

With your work safely in HDFS, you can perform the MapReduce function. This function requires a single command but a long request, as shown in Listing 6. This command requests the execution of a JAR. It actually implements a number of capabilities, but this example focuses on wordcount. The jobtracker daemon requests that the datanode perform the MapReduce job, which results in a considerable amount of output (smaller here, because you’re only processing two files). It shows the progress of the map and reduce functions, and then provides some useful statistics regarding the I/O for both file system and records processing.

Listing 6. Performing a MapReduce job for word frequency (wordcount) 

# hadoop-0.20 jar /usr/lib/hadoop-0.20/hadoop-0.20.2+228-examples.jar \ wordcount input output
10/04/29 17:36:49 INFO input.FileInputFormat: Total input paths to process : 2
10/04/29 17:36:49 INFO mapred.JobClient: Running job: job_201004291628_0009
10/04/29 17:36:50 INFO mapred.JobClient:  map 0% reduce 0%
10/04/29 17:37:00 INFO mapred.JobClient:  map 100% reduce 0%
10/04/29 17:37:06 INFO mapred.JobClient:  map 100% reduce 100%
10/04/29 17:37:08 INFO mapred.JobClient: Job complete: job_201004291628_0009
10/04/29 17:37:08 INFO mapred.JobClient: Counters: 17
10/04/29 17:37:08 INFO mapred.JobClient:   Job Counters 
10/04/29 17:37:08 INFO mapred.JobClient:     Launched reduce tasks=1
10/04/29 17:37:08 INFO mapred.JobClient:     Launched map tasks=2
10/04/29 17:37:08 INFO mapred.JobClient:     Data-local map tasks=2
10/04/29 17:37:08 INFO mapred.JobClient:   FileSystemCounters
10/04/29 17:37:08 INFO mapred.JobClient:     FILE_BYTES_READ=47556
10/04/29 17:37:08 INFO mapred.JobClient:     HDFS_BYTES_READ=111598
10/04/29 17:37:08 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=95182
10/04/29 17:37:08 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=30949
10/04/29 17:37:08 INFO mapred.JobClient:   Map-Reduce Framework
10/04/29 17:37:08 INFO mapred.JobClient:     Reduce input groups=2974
10/04/29 17:37:08 INFO mapred.JobClient:     Combine output records=3381
10/04/29 17:37:08 INFO mapred.JobClient:     Map input records=2937
10/04/29 17:37:08 INFO mapred.JobClient:     Reduce shuffle bytes=47562
10/04/29 17:37:08 INFO mapred.JobClient:     Reduce output records=2974
10/04/29 17:37:08 INFO mapred.JobClient:     Spilled Records=6762
10/04/29 17:37:08 INFO mapred.JobClient:     Map output bytes=168718
10/04/29 17:37:08 INFO mapred.JobClient:     Combine input records=17457
10/04/29 17:37:08 INFO mapred.JobClient:     Map output records=17457
10/04/29 17:37:08 INFO mapred.JobClient:     Reduce input records=3381

With the processing complete, inspect the result. Recall that the point of the job is to calculate the number of times words occurred in the input files. This output is emitted as a file of tuples, representing the word and the number of times it appeared in the input. You can use the cat command (after finding the particular output file) through the hadoop-0.20 utility to emit this data (see Listing 7).

Listing 7. Reviewing the output from the MapReduce wordcount operation 

# hadoop-0.20 fs -ls /user/root/output
Found 2 items
drwxr-xr-x   - root supergroup          0 2010-04-29 17:36 /user/root/output/_logs
-rw-r--r--   1 root supergroup      30949 2010-04-29 17:37 /user/root/output/part-r-00000
#  
# hadoop-0.20 fs -cat output/part-r-00000 | head -13
!= 1
"Atomic 2
"Cache 2
"Control 1
"Examples 1
"Has 7
"Inter-CPU 1
"LOAD 1
"LOCK" 1
"Locking 1
"Locks 1
"MMIO 1
"Pending 5

You can also extract the file from HDFS using the hadoop-0.20 utility (see Listing 8). You do this easily with the get utility (analogous to the put you executed earlier to write files into HDFS). For the get operation, specify the file in HDFS to extract (from your output subdirectory) and the file name to write in the local file system (output.txt).

Listing 8. Extracting the output from HDFS

# hadoop-0.20 fs -get output/part-r-00000 output.txt
# cat output.txt | head -5
!= 1
"Atomic 2
"Cache 2
"Control 1
"Examples 1

Let’s look at another example using the same JAR but a different use (here, you’ll explore a parallel grep). For this test, use your existing input files but remove the output subdirectory to recreate it for this test:

# hadoop-0.20 fs -rmr output
Deleted hdfs://localhost/user/root/output

Next, request the MapReduce job for grep. In this case, the grep is performed in parallel (the map), and then the grep results are combined (the reduce). Listing 9 provides the output for this use model (but in this case, some of the output has been pruned for brevity). Note that command request here, where your request is a grep taking input from the subdirectory called inputand placing the result in a subdirectory called output. The final parameter is the string you’re searching for (in this case,'kernel').

Listing 9. Performing a MapReduce Job for word search count (grep) 

	# hadoop-0.20 jar /usr/lib/hadoop/hadoop-0.20.2+228-examples.jar \ grep input output 'kernel'
10/04/30 09:22:29 INFO mapred.FileInputFormat: Total input paths to process : 2
10/04/30 09:22:30 INFO mapred.JobClient: Running job: job_201004291628_0010
10/04/30 09:22:31 INFO mapred.JobClient:  map 0% reduce 0%
10/04/30 09:22:42 INFO mapred.JobClient:  map 66% reduce 0%
10/04/30 09:22:45 INFO mapred.JobClient:  map 100% reduce 0%
10/04/30 09:22:54 INFO mapred.JobClient:  map 100% reduce 100%
10/04/30 09:22:56 INFO mapred.JobClient: Job complete: job_201004291628_0010
10/04/30 09:22:56 INFO mapred.JobClient: Counters: 18
10/04/30 09:22:56 INFO mapred.JobClient:   Job Counters 
10/04/30 09:22:56 INFO mapred.JobClient:     Launched reduce tasks=1
10/04/30 09:22:56 INFO mapred.JobClient:     Launched map tasks=3
10/04/30 09:22:56 INFO mapred.JobClient:     Data-local map tasks=3
10/04/30 09:22:56 INFO mapred.JobClient:   FileSystemCounters
10/04/30 09:22:56 INFO mapred.JobClient:     FILE_BYTES_READ=57
10/04/30 09:22:56 INFO mapred.JobClient:     HDFS_BYTES_READ=113144
10/04/30 09:22:56 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=222
10/04/30 09:22:56 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=109
...
10/04/30 09:23:14 INFO mapred.JobClient:     Map output bytes=15
10/04/30 09:23:14 INFO mapred.JobClient:     Map input bytes=23
10/04/30 09:23:14 INFO mapred.JobClient:     Combine input records=0
10/04/30 09:23:14 INFO mapred.JobClient:     Map output records=1
10/04/30 09:23:14 INFO mapred.JobClient:     Reduce input records=1

With the job complete, inspect the output directory (to identify the results file), and then perform a file system cat operation to view its contents (see Listing 10).

Listing 10. Inspecting the output of the MapReduce job

# hadoop-0.20 fs -ls output
Found 2 items
drwxr-xr-x  - root supergroup    0 2010-04-30 09:22 /user/root/output/_logs
-rw-r--r--  1 root supergroup   10 2010-04-30 09:23 /user/root/output/part-00000
# hadoop-0.20 fs -cat output/part-00000
17 kernel

Web-based interfaces

You’ve seen how to inspect the HDFS, but if you’re looking for information about Hadoop’s operation, you’ll find the Web interfaces useful. Recall that at the top of the Hadoop cluster is the namenode, which manages the HDFS. You can explore high-level details of the file system (such as available and used space and available datanodes) as well as the running jobs through http://localhost:50070. You can dig deeper into the jobtracker (job status) through http://localhost:50030. Note that in both of these cases, you reference localhost, because all daemons are running on the same host.

 Going farther

This article explored the installation and initial configuration of a simple (pseudo-distributed) Hadoop cluster (in this case, using Cloudera’s distribution for Hadoop). I chose this particular distribution because it simplified the installation and initial configuration of Hadoop. You can find a number of distributions for Hadoop (including the source) at apache.org. See the Resources section for more information.

But what if you lack the hardware resources to scale your Hadoop cluster for your specific needs? It turns out that Hadoop is so popular, you can easily run it within infrastructures using pre-built Hadoop VMs and leased servers. provides Machine Images (AMIs) as well as compute resources within the Elastic Compute Cloud ( EC2). Additionally, Microsoft recently announced coming support for Hadoop within its Windows® Azure Services Platform.

From this article, it’s easy to see how Hadoop makes distributed computing simple for processing large datasets. The next article in this series will explore how to configure Hadoop in a multi-node cluster with additional examples. See you then!

Resources

Learn

Get products and technologies

Part 2: Going further

The true power of the Hadoop distributed computing architecture lies in its distribution. In other words, the ability to distribute work to many nodes in parallel permits Hadoop to scale to large infrastructures and, similarly, the processing of large amounts of data. This article starts with a decomposition of a distributed Hadoop architecture, and then explores distributed configuration and use.

The distributed Hadoop architecture

Recall from Part 1 in this series that all Hadoop daemons were run on the same host. Although not exercising the parallel nature of Hadoop, this pseudo-distributed configuration permitted an easy way to test the features of Hadoop with minimal setup. Now, let’s explore the parallel nature of Hadoop using a cluster of machines.

From Part 1, the Hadoop configuration defined that all Hadoop daemons run on a single node. So, let’s first look at how Hadoop is naturally distributed for parallel operation. In a distributed Hadoop setup, you’ll have a master node and some number of slave nodes (see Figure 1).

Figure 1. Hadoop master and slave node decomposition 
Hadoop master and slave node decomposition

As shown in Figure 1, the master node consists of the namenode, secondary namenode, and jobtracker daemons (the so-called master daemons). In addition, this is the node from which you manage the cluster for the purposes of this demomonstration (using the Hadoop utility and browser). The slave nodes consist of the tasktracker and the datanode (the slave daemons). The distinction of this setup is that the master node contains those daemons that provide management and coordination of the Hadoop cluster, where the slave node contains the daemons that implement the storage functions for the Hadoop file system (HDFS) and MapReduce functionality (the data processing function).

For this demonstration, you create a master node and two slave nodes sitting on a single LAN. This setup is shown in Figure 2. Now, let’s explore the installation of Hadoop for multinode distribution and its configuration.

Figure 2. Hadoop cluster configuration
Hadoop cluster configuration

To simplify the deployment, you employ virtualization, which provides a few advantages. Although performance may not be advantageous in this setting, using virtualization, it’s possible to create a Hadoop installation, and then clone it for the other nodes. For this reason, your Hadoop cluster should appear as follows, running the master and slave nodes as virtual machines (VMs) in the context of a hypervisor on a single host (see Figure 3).

Figure 3. Hadoop cluster configuration in a virtual environment 

Hadoop cluster configuration in a virtual environment

 Upgrading Hadoop

In Part 1, you installed a special distribution for Hadoop that ran on a single node (pseudo-configuration). In this article, you update for a distributed configuration. If you’ve begun this article series here, read through Part 1 to install the Hadoop pseudo-configuration first.

In the pseudo-configuration, you performed no configuration, as everything was preconfigured for a single node. Now, you need to update the configuration. First, check the current configuration using the update-alternatives command as shown in Listing 1. This command tells you that the configuration is using conf.pseudo (the highest priority).

Listing 1. Checking the current Hadoop configuration

$ update-alternatives --display hadoop-0.20-conf
hadoop-0.20-conf - status is auto.
 link currently points to /etc/hadoop-0.20/conf.pseudo
/etc/hadoop-0.20/conf.empty - priority 10
/etc/hadoop-0.20/conf.pseudo - priority 30
Current `best' version is /etc/hadoop-0.20/conf.pseudo.

Next, create a new configuration by copying an existing one (in this case, conf.empty, as shown in Listing 1):

$ sudo cp -r /etc/hadoop-0.20/conf.empty /etc/hadoop-0.20/conf.dist

Finally, activate and check the new configuration:

Listing 2. Activating and checking the Hadoop configuration

$ sudo update-alternatives --install /etc/hadoop-0.20/conf hadoop-0.20-conf \ /etc/hadoop-0.20/conf.dist 40
$ update-alternatives --display hadoop-0.20-conf
hadoop-0.20-conf - status is auto.
 link currently points to /etc/hadoop-0.20/conf.dist
/etc/hadoop-0.20/conf.empty - priority 10
/etc/hadoop-0.20/conf.pseudo - priority 30
/etc/hadoop-0.20/conf.dist - priority 40
Current `best' version is /etc/hadoop-0.20/conf.dist.

Now, you have a new configuration called conf.dist that you’ll use for your new distributed configuration. At this stage, running in a virtualized environment, you clone this node into two additional nodes that will serve as the data nodes.

 Configuring Hadoop for distributed operation

The next step is to make all the nodes familiar with one another. You do this in the /etc/hadoop-0.20/conf.dist files called mastersand slaves. The three nodes in this example are statically assigned IP addresses, as shown here (from /etc/hosts):

Listing 3. Hadoop nodes for this setup (/etc/hosts)

master 192.168.108.133
slave1 192.168.108.134
slave2 192.168.108.135

So, on the master node, you update /etc/hadoop-0.20/conf.dist/masters to identify the master node, which appears as:

master

and then identify the slave nodes in /etc/hadoop-0.20/conf.dist/slaves, which contains the following two lines:

slave1
slave2

Next, from each node, connect through Secure Shell (ssh) to each of the other nodes to ensure that pass-phraseless ssh is working. Each of these files (masters, slaves) is used by the Hadoop start and stop utilities that you used in Part 1 of this series.

Next, continue with Hadoop-specific configuration in the /etc/hadoop-0.20/conf.dist subdirectory. The following changes are required on all nodes (master and both slaves), as defined by the Hadoop documentation. First, identify the HDFS master in the file core-site.xml (Listing 4), which defines the host and port of the namenode (note the use of the master node’s IP address). The file core-site.xml defines the core properties of Hadoop.

Listing 4. Defining the HDFS master in core-site.xml

<configuration>

  <property>
    <name>fs.default.name<name>
    <value>hdfs://master:54310<value>
    <description>The name and URI of the default FS.</description>
  <property>

<configuration>

Next, identify the MapReduce jobtracker. This jobtracker could exist on its own node, but for this configuration, place it on the master node as shown in Listing 5. The file mapred-site.xml contains the MapReduce properties.

Listing 5. Defining the MapReduce jobtracker in mapred-site.xml 

<configuration>

  <property>
    <name>mapred.job.tracker<name>
    <value>master:54311<value>
    <description>Map Reduce jobtracker<description>
  <property>

<configuration>

Finally, define the default replication factor (Listing 6). This value defines the number of replicas that will be created and is commonly no larger than three. In this case, you define it as 2 (the number of your datanodes). This value is defined in hdfs-site.xml, which contains the HDFS properties.

Listing 6. Defining the default replication for data in hdfs-site.xml 

<configuration>

  <property>
    <name>dfs.replication<name>
    <value>2<value>
    <description>Default block replication<description>
  <property>

<configuration>

The configuration items shown in Listing 4, Listing 5, and Listing 6) are the required elements for your distributed setup. Hadoop provides a large number of configuration options here, which allows you to tailor the entire environment. The Resources section provides more information on what’s available.

With your configuration complete, the next step is to format your namenode (the HDFS master node). For this operation, use thehadoop-0.20 utility, specifying the namenode and operation (-format):

Listing 7. Formatting the namenode

user@master:~# sudo su -
root@master:~# hadoop-0.20 namenode -format
10/05/11 18:39:58 INFO namenode.NameNode: STARTUP_MSG: 
/************************************************************
STARTUP_MSG: Starting NameNode
STARTUP_MSG:   host = master/127.0.1.1
STARTUP_MSG:   args = [-format]
STARTUP_MSG:   version = 0.20.2+228
STARTUP_MSG:   build =  -r cfc3233ece0769b11af9add328261295aaf4d1ad; 
************************************************************/
10/05/11 18:39:59 INFO namenode.FSNamesystem: fsOwner=root,root
10/05/11 18:39:59 INFO namenode.FSNamesystem: supergroup=supergroup
10/05/11 18:39:59 INFO namenode.FSNamesystem: isPermissionEnabled=true
10/05/11 18:39:59 INFO common.Storage: Image file of size 94 saved in 0 seconds.
10/05/11 18:39:59 INFO common.Storage: 
  Storage directory /tmp/hadoop-root/dfs/name has been successfully formatted.
10/05/11 18:39:59 INFO namenode.NameNode: SHUTDOWN_MSG: 
/************************************************************
SHUTDOWN_MSG: Shutting down NameNode at master/127.0.1.1
************************************************************/
root@master:~#

With your namenode formatted, it’s time to start the Hadoop daemons. You do this identically to your previous pseudo-distributed configuration in Part 1, but the process accomplishes the same thing for a distributed configuration. Note here that this code starts the namenode and secondary namenode (as indicated by the jps command):

Listing 8. Starting the namenode

root@master:~# /usr/lib/hadoop-0.20/bin/start-dfs.sh
starting namenode, logging to 
  /usr/lib/hadoop-0.20/bin/../logs/hadoop-root-namenode-mtj-desktop.out
192.168.108.135: starting datanode, logging to 
  /usr/lib/hadoop-0.20/bin/../logs/hadoop-root-datanode-mtj-desktop.out
192.168.108.134: starting datanode, logging to 
  /usr/lib/hadoop-0.20/bin/../logs/hadoop-root-datanode-mtj-desktop.out
192.168.108.133: starting secondarynamenode, 
  logging to /usr/lib/hadoop-0.20/logs/hadoop-root-secondarynamenode-mtj-desktop.out
root@master:~# jps
7367 NameNode
7618 Jps
7522 SecondaryNameNode
root@master:~#

If you now inspect one of the slave nodes (data nodes) using jps, you’ll see that a datanode daemon now exists on each node:

Listing 9. Inspecting the datanode on one of the slave nodes

root@slave1:~# jps
10562 Jps
10451 DataNode
root@slave1:~#

The next step is to start the MapReduce daemons (jobtracker and tasktracker). You do this as shown in Listing 10. Note that the script starts the jobtracker on the master node (as defined by your configuration; see Listing 5) and the tasktrackers on each slave node. A jps command on the master node shows that the jobtracker is now running.

Listing 10. Starting the MapReduce daemons

root@master:~# /usr/lib/hadoop-0.20/bin/start-mapred.sh
starting jobtracker, logging to 
  /usr/lib/hadoop-0.20/logs/hadoop-root-jobtracker-mtj-desktop.out
192.168.108.134: starting tasktracker, logging to 
  /usr/lib/hadoop-0.20/bin/../logs/hadoop-root-tasktracker-mtj-desktop.out
192.168.108.135: starting tasktracker, logging to 
  /usr/lib/hadoop-0.20/bin/../logs/hadoop-root-tasktracker-mtj-desktop.out
root@master:~# jps
7367 NameNode
7842 JobTracker
7938 Jps
7522 SecondaryNameNode
root@master:~#

Finally, check a slave node with jps. Here, you see that a tasktracker daemon has joined the datanode daemon to each slave data node:

Listing 11. Inspecting the datanode on one of the slave nodes 

root@slave1:~# jps
7785 DataNode
8114 Jps
7991 TaskTracker
root@slave1:~#

The relationships between the start scripts, the nodes, and the daemons that are started are shown in Figure 4. As you can see, the start-dfs script starts the namenodes and datanodes, where the start-mapred script starts the jobtracker and tasktrackers.

Figure 4. Relationship of the start scripts and daemons for each node 
Relationship of the start scripts and daemons for each node

Testing HDFS

Now that Hadoop is up and running across your cluster, you can run a couple of tests to ensure that it’s operational (see Listing 12). First, issue a file system command (fs) through the hadoop-0.20 utility and request a df (disk free) operation. As with Linux®, this command simply identifies the space consumed and available for the particular device. So, with a newly formatted file system, you’ve used no space. Next, perform an ls operation on the root of HDFS, create a subdirectory, list its contents, and remove it. Finally, you can perform an fsck (file system check) on HDFS using the fsck command within the hadoop-0.20utility. All this tells you—along with a variety of other information (such as 2 datanodes were detected)—that the file system is healthy.

Listing 12. Checking the HDFS

root@master:~# hadoop-0.20 fs -df
File system		Size	Used	Avail		Use%
/		16078839808	73728	3490967552	0%
root@master:~# hadoop-0.20 fs -ls /
Found 1 items
drwxr-xr-x   - root supergroup          0 2010-05-12 12:16 /tmp
root@master:~# hadoop-0.20 fs -mkdir test
root@master:~# hadoop-0.20 fs -ls test
root@master:~# hadoop-0.20 fs -rmr test
Deleted hdfs://192.168.108.133:54310/user/root/test
root@master:~# hadoop-0.20 fsck /
.Status: HEALTHY
 Total size:	4 B
 Total dirs:	6
 Total files:	1
 Total blocks (validated):	1 (avg. block size 4 B)
 Minimally replicated blocks:	1 (100.0 %)
 Over-replicated blocks:	0 (0.0 %)
 Under-replicated blocks:	0 (0.0 %)
 Mis-replicated blocks:		0 (0.0 %)
 Default replication factor:	2
 Average block replication:	2.0
 Corrupt blocks:		0
 Missing replicas:		0 (0.0 %)
 Number of data-nodes:		2
 Number of racks:		1

The filesystem under path '/' is HEALTHY
root@master:~#

Performing a MapReduce job

The next step is to perform a MapReduce job to validate that your entire setup is working properly (see Listing 13). The first step of this process is to introduce some data. So, begin by creating a directory to hold your input data (called input), which you do using the hadoop-0.20 utility with the mkdir command. Then, use the put command of hadoop-0.20 to put two files into HDFS. You can check the contents of the input directory using the ls command of the Hadoop utility.

Listing 13. Generating input data

root@master:~# hadoop-0.20 fs -mkdir input
root@master:~# hadoop-0.20 fs -put \ /usr/src/linux-source-2.6.27/Doc*/memory-barriers.txt input
root@master:~# hadoop-0.20 fs -put \ /usr/src/linux-source-2.6.27/Doc*/rt-mutex-design.txt input
root@master:~# hadoop-0.20 fs -ls input
Found 2 items
-rw-r--r--  2 root supergroup  78031 2010-05-12 14:16 /user/root/input/memory-barriers.txt
-rw-r--r--  2 root supergroup  33567 2010-05-12 14:16 /user/root/input/rt-mutex-design.txt
root@master:~#

Next, kick off the wordcount MapReduce job. As in the pseudo-distributed model, you specify your input subdirectory (which contains the input files) and the output directory (which doesn’t exist but will be created by the namenode and populated with the result data):

Listing 14. Running the MapReduce wordcount job on the cluster

root@master:~# hadoop-0.20 jar \ /usr/lib/hadoop-0.20/hadoop-0.20.2+228-examples.jar wordcount input output
10/05/12 19:04:37 INFO input.FileInputFormat: Total input paths to process : 2
10/05/12 19:04:38 INFO mapred.JobClient: Running job: job_201005121900_0001
10/05/12 19:04:39 INFO mapred.JobClient:  map 0% reduce 0%
10/05/12 19:04:59 INFO mapred.JobClient:  map 50% reduce 0%
10/05/12 19:05:08 INFO mapred.JobClient:  map 100% reduce 16%
10/05/12 19:05:17 INFO mapred.JobClient:  map 100% reduce 100%
10/05/12 19:05:19 INFO mapred.JobClient: Job complete: job_201005121900_0001
10/05/12 19:05:19 INFO mapred.JobClient: Counters: 17
10/05/12 19:05:19 INFO mapred.JobClient:   Job Counters 
10/05/12 19:05:19 INFO mapred.JobClient:     Launched reduce tasks=1
10/05/12 19:05:19 INFO mapred.JobClient:     Launched map tasks=2
10/05/12 19:05:19 INFO mapred.JobClient:     Data-local map tasks=2
10/05/12 19:05:19 INFO mapred.JobClient:   FileSystemCounters
10/05/12 19:05:19 INFO mapred.JobClient:     FILE_BYTES_READ=47556
10/05/12 19:05:19 INFO mapred.JobClient:     HDFS_BYTES_READ=111598
10/05/12 19:05:19 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=95182
10/05/12 19:05:19 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=30949
10/05/12 19:05:19 INFO mapred.JobClient:   Map-Reduce Framework
10/05/12 19:05:19 INFO mapred.JobClient:     Reduce input groups=2974
10/05/12 19:05:19 INFO mapred.JobClient:     Combine output records=3381
10/05/12 19:05:19 INFO mapred.JobClient:     Map input records=2937
10/05/12 19:05:19 INFO mapred.JobClient:     Reduce shuffle bytes=47562
10/05/12 19:05:19 INFO mapred.JobClient:     Reduce output records=2974
10/05/12 19:05:19 INFO mapred.JobClient:     Spilled Records=6762
10/05/12 19:05:19 INFO mapred.JobClient:     Map output bytes=168718
10/05/12 19:05:19 INFO mapred.JobClient:     Combine input records=17457
10/05/12 19:05:19 INFO mapred.JobClient:     Map output records=17457
10/05/12 19:05:19 INFO mapred.JobClient:     Reduce input records=3381
root@master:~#

The final step is to explore the output data. Because you ran the wordcount MapReduce job, the result is a single file (reduced from the processed map files). This file contains a list of tuples representing the words found in the input files and the number of times they appeared in all input files:

Listing 15. Inspecting the output of the MapReduce job

root@master:~# hadoop-0.20 fs -ls output
Found 2 items
drwxr-xr-x   - root supergroup          0 2010-05-12 19:04 /user/root/output/_logs
-rw-r--r--   2 root supergroup      30949 2010-05-12 19:05 /user/root/output/part-r-00000
root@master:~# hadoop-0.20 fs -cat output/part-r-00000 | head -13
!=	1
"Atomic	2
"Cache	2
"Control	1
"Examples	1
"Has	7
"Inter-CPU	1
"LOAD	1
"LOCK"	1
"Locking	1
"Locks	1
"MMIO	1
"Pending	5
root@master:~#

Web management interfaces

Although the hadoop-0.20 utility is extremely versatile and rich, sometimes it’s more convenient to use a GUI, instead. You can attach to the namenode for file system inspection through http://master:50070 and to the jobtracker through http://master:50030. Through the namenode, you can inspect the HDFS, as shown in Figure 5, where you inspect the input directory (which contains your input data—recall from Listing 13).

Figure 5. Inspecting the HDFS through the namenode 
Inspecting the HDFS through the namenode

Through the jobtracker, you can inspect running or completed jobs. In Figure 6, you can see an inspection of your last job (fromListing 14). This figure shows the various data emitted as output to the Java archive (JAR) request but also the status and number of tasks. Note here that two map tasks were performed (one for each input file) and one reduce task (to reduce the two map inputs).

Figure 6. Checking the status of a finished job
Checking the status of a finished job

Finally, you can check on the status of your datanodes through the namenode. The namenode main page identifies the number of live and dead nodes (as links) and allows you to inspect them further. The page shown in Figure 7 shows your live datanodes in addition to statistics for each.

Figure 7. Checking the status of the live datanodes 
Checking the status of the live datanodes

Many other views are possible through the namenode and jobtracker Web interfaces, but for brevity, this sample set is shown. Within the namenode and jobtracker Web pages, you’ll find a number of links that will take you to additional information about Hadoop configuration and operation (including run time logs).

 Going further

With this installment, you’ve seen how a pseudo-distributed configuration from Cloudera can be transformed into a fully distributed configuration. Surprisingly few steps along with an identical interface for MapReduce applications makes Hadoop a uniquely useful tool for distributed processing. Also interesting is exploring the scalability of Hadoop. By adding new datanodes (along with updating their XML files and slave files in the master), you can easily scale Hadoop for even higher levels of parallel processing. Part 3, the final installment in this Hadoop series, will explore how to develop a MapReduce application for Hadoop.

Resources

Learn

Get products and technologies

 Part 3: Application development

The first two articles of this series focused on the installation and configuration of Hadoop for single- and multinode clusters. This final article explores programming in Hadoop—in particular, the development of a map and a reduce application within the Ruby language. I chose Ruby, because first, it’s an awesome object-oriented scripting language that you should know, and second, you’ll find numerous references in the Resources section for tutorials addressing both the Java™ and Python languages. Through this exploration of MapReduce programming, I also introduce you to the streaming application programming interface (API). This API provides the means to develop applications in languages other than the Java language.

Let’s begin with a short introduction to map and reduce (from the functional perspective), and then take a deeper dive into the Hadoop programming model and its architecture and elements that carve, distribute, and manage the work.

The origin of map and reduce

So, what are the functional elements that inspired the MapReduce programming paradigm? In 1958, John McCarthy invented a language called Lisp, which implemented both numerical and symbolic computation but in a recursive form that is foreign to most languages in use today. (There’s actually a fascinating history of Lisp on Wikipedia that includes a useful tutorial—well worth the time to read.) Lisp was first realized on the IBM® 704, the first mass-produced computer, which also supported another old favorite: FORTRAN.

The map function, originating in functional languages like Lisp but now common in many other languages, is an application of a function over a list of elements. What does this mean? Listing 1 provides an interpreted session with Scheme Shell (SCSH), which is a Lisp derivative. The first line defines a function called square that takes an argument and emits its square root. The next line illustrates the use of the map function. As shown, with map, you provide your function and a list of elements to which the function is applied. The result is a new list containing the squared elements.

Listing 1. Demonstration of the map function in SCSH

> (define square (lambda (x) (* x x)))
> (map square '(1 3 5 7))
'(1 9 25 49)
>

Reduction also applies over a list but typically reduces the list to a scalar value. The example provided in Listing 2 illustrates another SCSH function for reducing a list to a scalar—in this case, summing the list of values in the form (1 + (2 + (3 + (4 + (5))))). Note that this is classical functional programming, relying on recursion over iteration.

Listing 2. Demonstration of reduction in SCSH

> (define (list-sum lis) (if (null? lis) 0 (+ (car lis) (list-sum (cdr lis)))))
> (list-sum '(1 2 3 4 5))
15
>

It’s interesting to note that recursion is as efficient as iteration in imperative languages because the recursion is translated into iteration under the covers.

 Hadoop’s programming model

Google introduced the idea of MapReduce as a programming model for processing or generating large sets of data. In the canonical model, a map function processes key-value pairs, resulting in an intermediate set of key-value pairs. A reduce function then processes those intermediate key-value pairs, merging the values for the associated keys (see Figure 1). Input data is partitioned in such a way that it can be distributed among a cluster of machines for processing in parallel. In the same way, the generated intermediate data is processed in parallel, making the approach ideal for processing very large amounts of data.

Figure 1. Simplified view of MapReduce processing
Simplified view of MapReduce processing

For a quick refresher, look at the architecture from Figure 1 from the perspective of map and reduce for word count (because you’ll develop a map and reduce application in this article). When input data is provided (into the Hadoop file system [HDFS]), it is first partitioned, and then distributed to map workers (via the job tracker). Although the example in Figure 2 shows a short sentence being partitioned, typically the amount of work to partition is in the 128MB-size range for one reason: It takes a small amount of time to set work up, so having more work to do minimizes this overhead. The map workers (in the canonical example) split the work into individual vectors that contain the tokenized word and an initial value (1, in this case). When the map tasks are complete (as defined in Hadoop by the task tracker), the work is provided to the reduce worker. The reduce worker reduces the keys to a unique set, with the value representing the number of keys found.

Figure 2. Simple MapReduce example
Simple MapReduce example

Note that this process can occur on the same machine or different machines or be done sequentially or in parallel using different partitions of data, and still the result is the same.

Although the canonical view (for search index generation using word count) is one way to view Hadoop, it turns out that this model of computing can be generically applied to a number of computational problems, as you’ll see.

 The flexibility of Hadoop

From the simple example shown in Figure 2, notice that the two primary elements are the map and reduce processes. Although there is a traditional view for how these processes work, it’s not a requirement of the architecture for map and reduce to behave this way. This is the real power of Hadoop—its flexibility to implement map and reduce processes that behave in a way that solves a particular application. The word count example is useful and applicable to a large number of problems, but other models still fit within this general framework. All that’s required is the development of a map and reduce application making the processes visible to Hadoop.

Among other applications, Hadoop has even been used for machine learning applications implementing algorithms as diverse as neural networks, support vector machines, and k-means clustering (see the Resources section for more information).

 Data streaming

Although Hadoop is a Java-based framework, it’s possible to write map and reduce applications in languages other than the Java language. Streaming makes this possible. The streaming utility within Hadoop implements a type of data flow glue. With the streaming utility, you can define your own map and reduce executables (with each taking input from standard input [stdin] and providing output through standard output [stdout]), and the streaming utility reads and writes data appropriately, invoking your applications as needed (see Listing 3).

Listing 3. Using the Hadoop streaming utility

hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
	-input inputData
	-output outputData
	-mapper map_exec
	-reducer reduce_exec

Listing 3 illustrates how to use the streaming utility within Hadoop, while Figure 3 shows graphically how the flow is defined. Note that this is a simple example of streaming use. Numerous options are available to tailor how data is parsed, to tailor how images are invoked, to specify replacement images for the partitioner or combiner, and other configuration tweaks (see theResources section for more information).

Figure 3. Graphical streaming example
Graphical streaming example

 Ruby example

With a basic understanding of the streaming utility under your belt, you’re ready to write a simple Ruby map and reduce application and see how to use the processes within the Hadoop framework. The example here goes with the canonical MapReduce application, but you’ll see other applications later (along with how you would implement them in map and reduce form).

Begin with the mapper. This script takes textual input from stdin, tokenizes it, and then emits a set of key-value pairs to stdout. Like most object-oriented scripting languages, this task is almost too simple. The mapper script is shown in Listing 4 (with some comments and white space to give it a bit more size). This program uses an iterator to read a line from stdin and another iterator to split the line into individual tokens. Each token (word) is then emitted to stdout with an associated value of 1 (separated by a tab).

Listing 4. Ruby map script (map.rb)

#!/usr/bin/env ruby

# Our input comes from STDIN
STDIN.each_line do |line|

  # Iterate over the line, splitting the words from the line and emitting
  # as the word with a count of 1.
  line.split.each do |word|
    puts "#{word}\t1"
  end

end

Next, look at the reduce application. This one is slightly more complicated but uses a Ruby hash (associative array) to simplify the reduction operation (see Listing 5). This script again works through the input data from stdin (passed by the streamingutility) and splits the line into a word and value. The hash is then checked for the word; if found, the count is added to the element. Otherwise, you create a new entry in the hash for the word, and then load the count (which should be 1 from the mapper process). When all input has been processed, you simply iterate through the hash and emit the key-value pairs to stdout.

Listing 5. Ruby reduce script (reduce.rb)

#!/usr/bin/env ruby

# Create an empty word hash
wordhash = {}

# Our input comes from STDIN, operating on each line
STDIN.each_line do |line|

  # Each line will represent a word and count
  word, count = line.strip.split

  # If we have the word in the hash, add the count to it, otherwise
  # create a new one.
  if wordhash.has_key?(word)
    wordhash[word] += count.to_i
  else
    wordhash[word] = count.to_i
  end

end

# Iterate through and emit the word counters
wordhash.each {|record, count| puts "#{record}\t#{count}"}

With the map and reduce scripts done, test them from the command line. Remember to change these files to executable usingchmod +x. Start by generating an input file, as shown in Listing 6.

Listing 6. Generating a file of input

# echo "Hadoop is an implementation of the map reduce framework for " \ "distributed processing of large data sets." > input
#

With this input, you can now test your mapper script, as shown in Listing 7. Recall that this script simply tokenizes the input to key-value pairs, where each value will be 1 (non-unique input).

Listing 7. Testing the mapper script

# cat input | ruby map.rb
Hadoop	1
is	1
an	1
implementation	1
of	1
the	1
map	1
reduce	1
framework	1
for	1
distributed	1
processing	1
of	1
large	1
data	1
sets.	1
#

So far, so good. Now, pull the entire application together in the original streaming form (Linux® pipes). In Listing 8, you pass your input through your map script, sort that output (optional step), and then pass the resulting intermediate data through the reducer script.

Listing 8. Simple MapReduce using Linux pipes

# cat input | ruby map.rb | sort | ruby reduce.rb
large	1
of	2
framework	1
distributed	1
data	1
an	1
the	1
reduce	1
map	1
sets.	1
Hadoop	1
implementation	1
for	1
processing	1
is	1
#

Ruby with Hadoop

With your map and reduce scripts working as expected in the shell environment, put them to the test with Hadoop. I’m going to skip the Hadoop setup tasks (refer to Part 1 or Part 2 of this series to get Hadoop up and running).

The first step is to create an input directory within HDFS for your input data, and then provide a sample file on which you’ll test your scripts. Listing 9 illustrates this step (see Part 1 or Part 2 for more information on these steps).

Listing 9. Creating input data for the MapReduce process

# hadoop fs -mkdir input
# hadoop dfs -put /usr/src/linux-source-2.6.27/Documentation/memory-barriers.txt input
# hadoop fs -ls input
Found 1 items
-rw-r--r--  1 root supergroup  78031 2010-06-04 17:36 /user/root/input/memory-barriers.txt
#

Next, using the streaming utility, invoke Hadoop with the custom scripts, specifying the input data and location for output (see Listing 10). Note in this example that the -file options simply tell Hadoop to package your Ruby scripts as part of the job submission.

Listing 10. Using Hadoop streaming with custom Ruby MapReduce scripts 

# hadoop jar /usr/lib/hadoop-0.20/contrib/streaming/hadoop-0.20.2+228-streaming.jar \ -file /home/mtj/ruby/map.rb -mapper /home/mtj/ruby/map.rb \ -file /home/mtj/ruby/reduce.rb -reducer /home/mtj/ruby/reduce.rb \ -input input/* -output output
packageJobJar: [/home/mtj/ruby/map.rb, /home/mtj/ruby/reduce.rb, /var/lib/hadoop-0.20/...
10/06/04 17:42:38 INFO mapred.FileInputFormat: Total input paths to process : 1
10/06/04 17:42:39 INFO streaming.StreamJob: getLocalDirs(): [/var/lib/hadoop-0.20/...
10/06/04 17:42:39 INFO streaming.StreamJob: Running job: job_201006041053_0001
10/06/04 17:42:39 INFO streaming.StreamJob: To kill this job, run:
10/06/04 17:42:39 INFO streaming.StreamJob: /usr/lib/hadoop-0.20/bin/hadoop job ...
10/06/04 17:42:39 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/...
10/06/04 17:42:40 INFO streaming.StreamJob:  map 0%  reduce 0%
10/06/04 17:43:17 INFO streaming.StreamJob:  map 100%  reduce 0%
10/06/04 17:43:26 INFO streaming.StreamJob:  map 100%  reduce 100%
10/06/04 17:43:29 INFO streaming.StreamJob: Job complete: job_201006041053_0001
10/06/04 17:43:29 INFO streaming.StreamJob: Output: output
#

Finally, explore the output using the cat file system operation through the hadoop utility (see Listing 11).

Listing 11. Exploring the Hadoop output

# hadoop fs -ls /user/root/output
Found 2 items
drwxr-xr-x  - root supergroup      0 2010-06-04 17:42 /user/root/output/_logs
-rw-r--r--  1 root supergroup  23014 2010-06-04 17:43 /user/root/output/part-00000
# hadoop fs -cat /user/root/output/part-00000 | head -12
+--->|	4
immediate	2
Alpha)	1
enable	1
_mandatory_	1
Systems	1
DMA.	2
AMD64	1
{*C,*D},	2
certainly	2
back	2
this	23
#

So, in less than 30 lines of script, you’ve implemented the map and reduce elements and demonstrated their execution within the Hadoop framework. A simple example, but one that illustrates the real power behind Hadoop and why it’s becoming such a popular framework for processing large data sets with custom or proprietary algorithms.

 Other applications for Hadoop

Hadoop can be used in many applications beyond simply computing word counts of large data sets. All that’s needed is a representation of the data in a vector form that the Hadoop infrastructure can use. Although canonical examples use the vector representation as a key and value, there’s no restriction on how you can define the value (such as an aggregate of a number of values). This flexibility can open new opportunities for Hadoop in a richer set of applications.

One interesting application that fits squarely in the MapReduce word count model is tabulating the frequency of Web server access (discussed in the seminal Google paper). For this application, the URLs serve as the keys (as ingested from the Web server access logs). The result of the reduce process is the total number of accesses per URL for a given Web site based on the Web server logs.

In machine learning applications, Hadoop has been used as a way to scale genetic algorithms for processing large populations of GA individuals (potential solutions). The map process performs the traditional genetic algorithm, seeking the best individual solution from the local pool. The reduce application then becomes a tournament of individual solutions from the map phase. This permits individual nodes to identify their best solution, and then to allow these solutions to compete in the reduce phase in a distributed display of survival of the fittest.

Another interesting application was created to identify botnets for email spam. The first step in this process was to classify email messages for the purpose of reducing them (based on a set of fingerprints) as coming from a given organization. From this filtered data, a graph was built for email that was connected in some way (for example, referring to the same link in the email message body). These related emails were then reduced to hosts (static or dynamic IP address) to identify the botnet in question.

Outside of applications that view the world through map and reduce primitives, Hadoop is useful as a means of distributing work among a cluster of machines. Map and reduce don’t necessarily force a particular type of application. Instead, Hadoop can be viewed as a way to distribute both data and algorithms to hosts for faster parallel processing.

 Hadoop application ecosystem

Although Hadoop provides a flexible framework, other applications are available that can transform its interface for other applications. One interesting example is called Hive, which is a data warehouse infrastructure with its own query language (called Hive QL). Hive makes Hadoop more familiar to those with a Structured Query Language (SQL) background, but it also supports the traditional MapReduce infrastructure for data processing.

HBase is another interesting application that resides on top of the HDFS. It’s a high-performance database system similar to Google BigTable. Instead of traditional file processing, HBase makes database tables the input and output form for MapReduce processing.

Finally, Pig is a platform on Hadoop for analyzing large data sets. Pig provides a high-level language that compiles to map and reduce applications.

Going further

This final article in the Hadoop series explored the development of a map and reduce application in Ruby for the Hadoop framework. Hopefully, from this article, you can see the real power of Hadoop. Although Hadoop restricts you to a particular programming model, that model is flexible and can be applied to a large number of applications.

Resources

Learn

  • MapReduce: Simplified Data Processing on Large Clusters is the seminal paper on MapReduce, written in 2004 by Jeff Dean and Sanjay Ghemawat. This paper remains an enjoyable read.
  • This article explored Hadoop’s streaming utility, which permits the development of map and reduce scripts in languages other than the Java language. Apache provides a great set of resources for streaming, including the Hadoop Streamingdocumentation and the streaming wiki (which provides a good introduction to the various command-line options).
  • Wikipedia provides great introductions to the Lisp and Scheme languages as well as a general introduction to functional programming concepts (and MapReduce).
  • To demonstrate the functional programming elements of map and reduce, this article used the Scheme shell. If you’ve ever wanted to experiment with Scheme, SCSH is a great sandbox to experiment with this powerful language. You can also learn about Scheme and scripting with C in Tim’s article Scripting with Guile (developerWorks, January 2009), or read a great Scheme introduction.
  • In the paper Map-Reduce for Machine Learning on Multicore, the MapReduce model is used to implement a variety of machine learning algorithms for multicore processors. It’s an interesting read to explore how the MapReduce model can apply to a variety of computational algorithms.
  • Hive is a data warehouse infrastructure built on top of Hadoop. It provides a query language over Hadoop data while supporting the traditional Hadoop programming model. HBase is a database representation over Hadoop’s HDFS, permitting MapReduce to operate on database tables over simple files. Finally, Pig is a platform for large data set analysis that includes a high-level language for Hadoop programming.
  • The Ruby language is the latest of the object-oriented scripting languages. It is dynamic with a focus on programmer productivity.
  • Check out this list of Mapreduce and Hadoop algorithms in academic papers. This site provides an interesting perspective on how Hadoop is used for a variety of applications (from science, machine learning, web services, and more).
  • Yahoo! provides a great set of resources for Hadoop at the developer network. In particular, the Yahoo! Hadoop Tutorialintroduces Hadoop and provides a detailed discussion of its use and configuration.
  • In the developerWorks Linux zone, find hundreds of how-to articles and tutorials, as well as downloads, discussion forums, and a wealth of other resources for Linux developers and administrators.
  • Stay current with developerWorks technical events and webcasts focused on a variety of IBM products and IT industry topics.
  • Attend a free developerWorks Live! briefing to get up-to-speed quickly on IBM products and tools, as well as IT industry trends.
  • Watch developerWorks on-demand demos ranging from product installation and setup demos for beginners, to advanced functionality for experienced developers.
  • Follow developerWorks on Twitter, or subscribe to a feed of Linux tweets on developerWorks.

Get products and technologies

Vietnamese documents – Original : http://www.gocit.vn/bai-viet/xu-ly-du-lieu-phan-tan-bang-hadoop/

Source : IBM Libary

Print Friendly

Comments

comments

Bài viết liên quan