2.1 기상 데이터셋
2.1.1 데이터 포맷
행 단위 아스키 형식
1901~2001 연도별 디렉터리 존재
2.2 유닉스 도구로 데이터 분석하기
#!/usr/bin/env bash
for year in all/* #압축된 연도별 파일 반복적으로 돎
do
echo -ne 'basename $year .gz'"\t"
gunzip -c $year | \ #해당 연도 출력
awk '{ temp = substr($0, 88, 5) + 0;
q = substr($0, 93, 1);
if (temp != 9999 && q ~ /[01459]/ && temp > max) max = temp }
END { print max }' #정수형 변환, 유효값 & 신뢰 여부 확인 -> 최고기온 변경
done
프로그램 각 부분 병렬 수행 ➡️ 처리 속도 ⤴️
❓문제점1
동일한 크기로 나누는 것이 쉽고 명확하지 않음 ➡️ 전체 입력 파일 / 고정 길이의 데이터 청크
❓문제점2
독립적인 프로세스의 결과를 모두 합치는 데 많은 처리 필요할 수 있음
❓문제점3
단일 머신의 처리 능력 한계 ➡️ 코디네이션 & 신뢰성의 범주에 속하는 요소 추가 고려
⭐병렬 처리의 복잡함을 해결하기 위해 하둡 같은 프레임워크를 사용
2.3 하둡으로 데이터 분석하기
2.3.1 맵과 리듀스
맵리듀스 = 맵 단계 + 리듀스 단계 (각 단계: 입력과 출력으로 키-값의 쌍, 맵 함수 + 리듀스 함수 작성)
1. 맵 단계 입력: NCDC 원본 데이터 + 텍스트 입력 포맷 선택
키(파일의 시작부에서 각 행이 시작되는 지점까지의 오프셋, 무시 가능) - 값(각 행, 문자열)의 쌍
2. 맵 함수 작성: 데이터의 준비 단계, 잘못된 레코드 걸러주는 작업
각 행에서 연도, 기온 추출
➡️ 각 행: 키-값 쌍으로 변환되어 맵 함수의 입력이 됨
3. 맵 함수의 출력 ➡️ 리듀스 함수의 입력: 맵리듀스 프레임워크에 의해 처리됨(키-값 쌍: 키를 기준으로 정렬, 그룹화)
2.3.2 자바 맵리듀스
//최고 기온을 구하는 Mapper 예제
import java.io.IOException;
import org.apache.hadoop.io.IntWritable; //자바 integer (기온)
import org.apache.hadoop.io.LongWritable; //자바 long
import org.apache.hadoop.io.Text; //자바 string (연도)
import org.apache.hadoop.mapreduce.Mapper; //Mapper 클래스: 제너릭 타입, 네 개의 정규 타입 매개변수(입력키, 입력값, 출력키, 출력값)을 가짐
public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private static final int MISSING=9999;
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line=value.toString();
String year=line.substring(15, 19); //원하는 칼럼 추출
int airTemperature;
if (line.charAt(87) == '+') { // parseInt 함수는 앞에 더하기 기호가 있으면 안 됨
airTemperature=Integer.parseInt(line.substring(88, 92));
} else {
airTemperature=Integer.parseInt(line.substring(87, 92));
}
String quality=line.substring(92, 93);
if (airTemperature != MISSING && quality.matches("[01459]")) {
context.write(new Text(year), new IntWritable(airTemperature));
}
}
}
//최고 기온을 구하는 Reducer 예제
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer; //Reducer 클래스: 제너릭 타입, 네 개의 정규 타입 매개변수(입력키, 입력값, 출력키, 출력값)을 가짐
public class MaxTemperatureReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
for (IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
}
context.write(key, neew IntWritable(maxValue));
}
}
//기상 데이터셋에서 최고 기온을 찾는 애플리케이션
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job; //잡 명세서 작성
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MaxTemperature {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usate: MaxTemperature <input path> <output path>");
System.exit(-1);
}
Job job = new Job();
job.setJarByClass(MaxTemperature.class); //하둡 클러스터에서 잡을 실행할 때는 JAR 파일로 묶어야 함
job.setJobName("Max temperature");
FileInputFormat.addInputPath(job, new Path(args[0])); //입력 경로 지정 (addInputPath()를 여러 번 호출하면 다수의 입력 경로 지정)
FileOutputFormat.addOutputPath(job, new Path(args[1])); //출력 경로 지정
job.setMapperClass(MaxTemperatureMapper.class); //맵 입출력 데이터 타입 지정
job.setReducerClass(MaxTemperatureReducer.class); //리듀스 입출력 데이터 타입 지정
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class); //리듀스 함수의 출력 타입 지정
//리듀스 함수의 입력 타입 TextInputFormat 사용(명시적 지정 X)
System.exit(job.waitForCompletion(true) ? 0 : 1); //잡을 제출한 후 잡이 모두 끝날 때까지 기다림, 중간 결과 생성 여부 표시 인자 true: 잡은 콘솔로 진척 상황 보고
}
}
2.4 분산형으로 확장하기
HDFS(분산 파일시스템), YARN(하둡 자원 관리 시스템) 작동 방식
2.4.1 데이터 흐름
잡: 클라이언트가 수행하는 작업의 기본 단위
= 입력 데이터 + 맵리듀스 프로그램 + 설정 정보
- 맵 태스크: YARN을 이용하여 스케줄링, 클러스터의 여러 노드에서 실행
- 리듀스 태스크: YARN을 이용하여 스케줄링, 클러스터의 여러 노드에서 실행
잡의 입력/입력 스플릿(고정 크기, 일반적으로 128MB - HDFS 블록 기본 크기) ➡️ 각 스플릿마다 하나의 맵 태스트 생성, 스플릿의 각 레코드 사용자 정의 맵 함수로 처리
데이터 지역성 최적화: 네트워크의 대역폭 사용 ❌ ➡️ HDFS 내의 입력 데이터가 있는 노드에서 맵 태스크를 실행할 때 가장 빠르게 작동
맵 태스크의 결과: 로컬 디스크에 저장
리듀스 태스크: 모든 매퍼의 출력 결과를 입력으로 받음 ➡️ 데이터 지역성의 장점 ❌
리듀스 출력에 대한 HDFS 블록의 첫번째 복제본 ▶️ 로컬 노드에 저장
나머지 복제본 ▶️ 외부 렉에 저장 [Chapter 3]
리듀스 태스크의 수: 입력 크기와 상관 없이 독립적으로 지정 [8.1.1]
리듀스 多 ➡️ 맵 태스크: 리듀스 수만큼 파티션 생성, 맵의 결과 각 파티션에 분배
셔플: 맵 - 리듀스 사이의 데이터 흐름 [7.3]
셔플 필요 ❌, 모든 처리 과정을 완전히 병렬로 처리하는 경우
2.4.2 컴바이너 함수
컴바이너 함수: 맵의 결과 처리 ➡️ 맵과 리듀스 태스크 사이의 데이터 전송 최소화
- 호출 빈도와 상관없이 리듀스의 결과가 언제나 같도록 보장
- Reducer 클래스를 사용해서 정의
public class MaxTemperatureWithCombiner{
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: MaxTemperatureWithCombiner <input path> " + "<output path>");
System.exit(-1);
}
Job job = new Job();
job.setJarByClass(MaxTemperatureWithCombiner.class); //하둡 클러스터에서 잡을 실행할 때는 JAR 파일로 묶어야 함
job.setJobName("Max temperature");
FileInputFormat.addInputPath(job, new Path(args[0])); //입력 경로 지정 (addInputPath()를 여러 번 호출하면 다수의 입력 경로 지정)
FileOutputFormat.setOutputPath(job, new Path(args[1])); //출력 경로 지정
job.setMapperClass(MaxTemperatureMapper.class); //맵 입출력 데이터 타입 지정
job.setCombinerClass(MaxTemperatureReducer.class);
job.setReducerClass(MaxTemperatureReducer.class); //리듀스 입출력 데이터 타입 지정
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class); //리듀스 함수의 출력 타입 지정
//리듀스 함수의 입력 타입 TextInputFormat 사용(명시적 지정 X)
System.exit(job.waitForCompletion(true) ? 0 : 1); //잡을 제출한 후 잡이 모두 끝날 때까지 기다림, 중간 결과 생성 여부 표시 인자 true: 잡은 콘솔로 진척 상황 보고
}
}
2.4.3 분산 맵리듀스 잡 실행하기
[6장]
2.5 하둡 스트리밍
하둡 스트리밍: 유닉스 표준 스트림 사용
- 텍스트 처리에 적합
2.5.1 루비
#루비로 작성한 최고 기온을 찾는 맵 함수
#!/usr/bin/env ruby
STDIN.each_line do |line| #STDIN에서 각 행을 처리하는 블록을 반복적으로 실행
val = line
year, temp, q = val[15, 4], val[87, 5], val[92, 1]
puts "#{year}\t#{temp}" if (temp != "+9999" && q =~ /[01459]/) #탭 문자로 구분된 연도와 기온 표준 출력(puts)
end
#루비로 작성한 최고 기온을 찾는 리듀스 함수
#!/usr/bin/env ruby
last_key, max_val = nil, -1000000
STDIN.each_line do |line|
key, val = line.split("\t")
if last_key && last_key != key
puts "#{lst_key}\t#{max_val}"
last_key, max_val = key, val.to_i
else
last_key, max_val = key, [max_val, val.to_i].max
end
end
puts "#{last_key}\t#{max_val}" if last_key
2.5.2 파이썬
#파이썬으로 작성한 최고 기온을 찾는 맵 함수
#!/usr/bin/env python
import re
import sys
for line in sys.stdin:
val = ine.strip()
(year, temp, q) = (val[15:19], val[87:92], val[92:93])
if (temp != "+9999" and re.match("[01459]", q)):
print "%s\t%s" % (year, temp)
#파이썬으로 작성한 최고 기온을 찾는 리듀스 함수
#!/usr/bin/env python
import sys
(last_key, max_val) = (None, -sys.maxint)
for line in sys.stdin:
(key, val) = line.strip().split("\t")
if last_key and last_key != key:
print "%s\t%s" % (last_key, max_val)
(last_key, max_val) = (key, int(val))
else:
(last_key, max_val) = (key, max(max_val, int(val)))
if last_key:
print "%s\t%s" % (last_key, max_val)
'Data > Hadoop' 카테고리의 다른 글
[하둡 완벽 가이드] Chapter 6 맵리듀스 프로그래밍 (0) | 2024.05.08 |
---|---|
[하둡 완벽 가이드] Chapter 4 하둡 I/O (0) | 2024.05.05 |
[하둡] 하둡 설치하기 (0) | 2024.04.07 |
[하둡 완벽 가이드] Chapter 3 하둡 분산 파일 시스템 (0) | 2024.03.24 |
[하둡 완벽 가이드] Chapter 1 하둡과의 만남 (0) | 2024.03.17 |