본문 바로가기
Data/Hadoop

[하둡 완벽 가이드] Chapter 2 맵리듀스

by 양진주 2024. 3. 17.

2.1 기상 데이터셋 

2.1.1 데이터 포맷 

행 단위 아스키 형식 

1901~2001 연도별 디렉터리 존재 

 

2.2 유닉스 도구로 데이터 분석하기 

#!/usr/bin/env bash
for year in all/*		#압축된 연도별 파일 반복적으로 돎 
do
 echo -ne 'basename $year .gz'"\t"
 gunzip -c $year | \			#해당 연도 출력 
  awk '{ temp = substr($0, 88, 5) + 0;			
  		 q = substr($0, 93, 1);
   		 if (temp != 9999 && q ~ /[01459]/ && temp > max) max = temp }
       END { print max }'			#정수형 변환, 유효값 & 신뢰 여부 확인 -> 최고기온 변경 
done

 

실행 과정의 첫부분

프로그램 각 부분 병렬 수행 ➡️ 처리 속도 ⤴️

❓문제점1
동일한 크기로 나누는 것이 쉽고 명확하지 않음 ➡️ 전체 입력 파일 / 고정 길이의 데이터 청크 
❓문제점2
독립적인 프로세스의 결과를 모두 합치는 데 많은 처리 필요할 수 있음 
❓문제점3
단일 머신의 처리 능력 한계 ➡️ 코디네이션 & 신뢰성의 범주에 속하는 요소 추가 고려 

⭐병렬 처리의 복잡함을 해결하기 위해 하둡 같은 프레임워크를 사용 

 

2.3 하둡으로 데이터 분석하기 

2.3.1 맵과 리듀스

맵리듀스 = 맵 단계 + 리듀스 단계 (각 단계: 입력과 출력으로 키-값의 쌍, 맵 함수 + 리듀스 함수 작성)

1. 맵 단계 입력: NCDC 원본 데이터 + 텍스트 입력 포맷 선택
키(파일의 시작부에서 각 행이 시작되는 지점까지의 오프셋, 무시 가능) - 값(각 행, 문자열)의 쌍 

2. 맵 함수 작성: 데이터의 준비 단계, 잘못된 레코드 걸러주는 작업 
각 행에서 연도, 기온 추출 

➡️ 각 행: 키-값 쌍으로 변환되어 맵 함수의 입력이 됨 

3. 맵 함수의 출력 ➡️ 리듀스 함수의 입력: 맵리듀스 프레임워크에 의해 처리됨(키-값 쌍: 키를 기준으로 정렬, 그룹화) 

 

 

2.3.2 자바 맵리듀스 

//최고 기온을 구하는 Mapper 예제 

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;		//자바 integer (기온)
import org.apache.hadoop.io.LongWritable;		//자바 long
import org.apache.hadoop.io.Text;			//자바 string (연도)
import org.apache.hadoop.mapreduce.Mapper;		//Mapper 클래스: 제너릭 타입, 네 개의 정규 타입 매개변수(입력키, 입력값, 출력키, 출력값)을 가짐 

public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
	private static final int MISSING=9999;
	
	@Override
	public void map(LongWritable key, Text value, Context context)
		throws IOException, InterruptedException {
		
		String line=value.toString();
		String year=line.substring(15, 19);		//원하는 칼럼 추출 
		int airTemperature;
		if (line.charAt(87) == '+') { // parseInt 함수는 앞에 더하기 기호가 있으면 안 됨 
			airTemperature=Integer.parseInt(line.substring(88, 92));
		} else {
			airTemperature=Integer.parseInt(line.substring(87, 92));
		}
		String quality=line.substring(92, 93);
		if (airTemperature != MISSING && quality.matches("[01459]")) {
			context.write(new Text(year), new IntWritable(airTemperature));
		}
	}
}

 

 

//최고 기온을 구하는 Reducer 예제 

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;		//Reducer 클래스: 제너릭 타입, 네 개의 정규 타입 매개변수(입력키, 입력값, 출력키, 출력값)을 가짐 

public class MaxTemperatureReducer
	extends Reducer<Text, IntWritable, Text, IntWritable> {
	
	@Override
	public void reduce(Text key, Iterable<IntWritable> values, Context context)
		throws IOException, InterruptedException {
		
		int maxValue = Integer.MIN_VALUE;
        for (IntWritable value : values) {
        	maxValue = Math.max(maxValue, value.get());
        }
        context.write(key, neew IntWritable(maxValue));
	}
}

 

 

//기상 데이터셋에서 최고 기온을 찾는 애플리케이션 

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;			//잡 명세서 작성 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MaxTemperature {
	public static void main(String[] args) throws Exception {
    	if (args.length != 2) { 
        	System.err.println("Usate: MaxTemperature <input path> <output path>");
            System.exit(-1);
        }
        
        Job job = new Job();
        job.setJarByClass(MaxTemperature.class);		//하둡 클러스터에서 잡을 실행할 때는 JAR 파일로 묶어야 함 
        job.setJobName("Max temperature");
        
        FileInputFormat.addInputPath(job, new Path(args[0]));		//입력 경로 지정 (addInputPath()를 여러 번 호출하면 다수의 입력 경로 지정) 
        FileOutputFormat.addOutputPath(job, new Path(args[1]));		//출력 경로 지정 
        
        job.setMapperClass(MaxTemperatureMapper.class);		//맵 입출력 데이터 타입 지정
        job.setReducerClass(MaxTemperatureReducer.class);		//리듀스 입출력 데이터 타입 지정 
        
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);		//리듀스 함수의 출력 타입 지정 
        
        //리듀스 함수의 입력 타입 TextInputFormat 사용(명시적 지정 X)
        
        System.exit(job.waitForCompletion(true) ? 0 : 1);		//잡을 제출한 후 잡이 모두 끝날 때까지 기다림, 중간 결과 생성 여부 표시 인자 true: 잡은 콘솔로 진척 상황 보고 
    }
}

 

 

2.4 분산형으로 확장하기

HDFS(분산 파일시스템), YARN(하둡 자원 관리 시스템) 작동 방식

2.4.1 데이터 흐름

잡: 클라이언트가 수행하는 작업의 기본 단위

= 입력 데이터 + 맵리듀스 프로그램 + 설정 정보 

 

- 맵 태스크: YARN을 이용하여 스케줄링, 클러스터의 여러 노드에서 실행

- 리듀스 태스크: YARN을 이용하여 스케줄링, 클러스터의 여러 노드에서 실행

 

잡의 입력/입력 스플릿(고정 크기, 일반적으로 128MB - HDFS 블록 기본 크기) ➡️ 각 스플릿마다 하나의 맵 태스트 생성, 스플릿의 각 레코드 사용자 정의 맵 함수로 처리

 

데이터 지역성 최적화: 네트워크의 대역폭 사용 ❌ ➡️ HDFS 내의 입력 데이터가 있는 노드에서 맵 태스크를 실행할 때 가장 빠르게 작동 

 

맵 태스크의 결과: 로컬 디스크에 저장 

리듀스 태스크: 모든 매퍼의 출력 결과를 입력으로 받음 ➡️ 데이터 지역성의 장점 ❌ 

리듀스 출력에 대한 HDFS 블록의 첫번째 복제본 ▶️ 로컬 노드에 저장 

나머지 복제본 ▶️ 외부 렉에 저장 [Chapter 3] 

 

단일 리듀스 태스크의 맵리듀스 데이터 흐름

 

리듀스 태스크의 수: 입력 크기와 상관 없이 독립적으로 지정 [8.1.1]

리듀스 多 ➡️ 맵 태스크: 리듀스 수만큼 파티션 생성, 맵의 결과 각 파티션에 분배 

다수의 리듀스 태스크가 있는 맵리듀스 데이터 흐름

셔플: 맵 - 리듀스 사이의 데이터 흐름 [7.3]

리듀스 태스크가 없는 맵리듀스 데이터 흐름

셔플 필요 ❌, 모든 처리 과정을 완전히 병렬로 처리하는 경우 

2.4.2 컴바이너 함수

컴바이너 함수: 맵의 결과 처리 ➡️ 맵과 리듀스 태스크 사이의 데이터 전송 최소화

- 호출 빈도와 상관없이 리듀스의 결과가 언제나 같도록 보장

- Reducer 클래스를 사용해서 정의 

public class MaxTemperatureWithCombiner{

	public static void main(String[] args) throws Exception {
    	if (args.length != 2) { 
        	System.err.println("Usage: MaxTemperatureWithCombiner <input path> " + "<output path>");
            System.exit(-1);
        }
        
        Job job = new Job();
        job.setJarByClass(MaxTemperatureWithCombiner.class);		//하둡 클러스터에서 잡을 실행할 때는 JAR 파일로 묶어야 함 
        job.setJobName("Max temperature");
        
        FileInputFormat.addInputPath(job, new Path(args[0]));		//입력 경로 지정 (addInputPath()를 여러 번 호출하면 다수의 입력 경로 지정) 
        FileOutputFormat.setOutputPath(job, new Path(args[1]));		//출력 경로 지정 
        
        job.setMapperClass(MaxTemperatureMapper.class);		//맵 입출력 데이터 타입 지정
        job.setCombinerClass(MaxTemperatureReducer.class);
        job.setReducerClass(MaxTemperatureReducer.class);		//리듀스 입출력 데이터 타입 지정 
        
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);		//리듀스 함수의 출력 타입 지정 
        
        //리듀스 함수의 입력 타입 TextInputFormat 사용(명시적 지정 X)
        
        System.exit(job.waitForCompletion(true) ? 0 : 1);		//잡을 제출한 후 잡이 모두 끝날 때까지 기다림, 중간 결과 생성 여부 표시 인자 true: 잡은 콘솔로 진척 상황 보고 
    }
}

2.4.3 분산 맵리듀스 잡 실행하기

[6장]

2.5 하둡 스트리밍

하둡 스트리밍: 유닉스 표준 스트림 사용

- 텍스트 처리에 적합 

2.5.1 루비

#루비로 작성한 최고 기온을 찾는 맵 함수 

#!/usr/bin/env ruby

STDIN.each_line do |line|		#STDIN에서 각 행을 처리하는 블록을 반복적으로 실행
	val = line
    year, temp, q = val[15, 4], val[87, 5], val[92, 1]
    puts "#{year}\t#{temp}" if (temp != "+9999" && q =~ /[01459]/)		#탭 문자로 구분된 연도와 기온 표준 출력(puts)
end

 

#루비로 작성한 최고 기온을 찾는 리듀스 함수 

#!/usr/bin/env ruby

last_key, max_val = nil, -1000000
STDIN.each_line do |line|
	key, val = line.split("\t")
    if last_key && last_key != key
    	puts "#{lst_key}\t#{max_val}"
        last_key, max_val = key, val.to_i
    else
    	last_key, max_val = key, [max_val, val.to_i].max
    end
end
puts "#{last_key}\t#{max_val}" if last_key

2.5.2 파이썬

#파이썬으로 작성한 최고 기온을 찾는 맵 함수

#!/usr/bin/env python

import re
import sys

for line in sys.stdin:
	val = ine.strip()
	(year, temp, q) = (val[15:19], val[87:92], val[92:93])
    if (temp != "+9999" and re.match("[01459]", q)):
    	print "%s\t%s" % (year, temp)
#파이썬으로 작성한 최고 기온을 찾는 리듀스 함수

#!/usr/bin/env python

import sys

(last_key, max_val) = (None, -sys.maxint)
for line in sys.stdin:
	(key, val) = line.strip().split("\t")
    if last_key and last_key != key:
    	print "%s\t%s" % (last_key, max_val)
    	(last_key, max_val) = (key, int(val))
	else:
    	(last_key, max_val) = (key, max(max_val, int(val)))
        
    if last_key:
    	print "%s\t%s" % (last_key, max_val)