7.1 맵리듀스 잡 실행 상세분석
- Job 객체의 submit ( ) 메서드 호출로 맵리듀스 잡을 실행
- 클라이언트: 맵리듀스 잡을 제출
- YARN 리소스 매니저: 클러스터 상에 계산 리소스의 할당을 제어
- YARN 노드 매니저: 클러스터의 각 머신에서 계산 컨테이너를 시작하고 모니터링
- 맵리듀스 애플리케이션 마스터: 맵리듀스 잡을 수행하는 각 태스크를 제어. 애플리케이션 마스터와 맵 리듀스 태스크는 컨테이너 내에서 실행되며, 리소스 매니저는 잡을 할당하고 노드 매니저는 태스크를 관리함
- 분산 파일시스템: 다른 단계 간에 잡 리소스 파일들을 공유하는 데 사용(보통 HDFS를 사용)
7.1.1 잡 제출
- Job의 submit ( ) 메서드: 내부의 OobSubmitter 인스턴스를 생성, submitJoblnternal () 메서드 호출
- 잡 제출 -> waitForCompletion() 메서드가 1초에 한 번씩 잡의 진행 상황을 조사하여 변경 내역이 있으면 콘솔로 보여줌
- 잡이 성공적으로 완료 -> 잡 카운터를 콘솔에 보여줌
- 실패 -> 잡 실패의 원인이 된 에러를 콘솔에 보여줌
- Jobsubmitter의 잡 제출 과정
7.1.2 잡초기화
- 리소스 매니저가 submitApplication ( ) 메서드의 호출을 받음 -> YARN 스케줄러에 요청을 전달 -> 스케줄러는 컨테이너를 하나 할당함, 리소스 매니저는 노드 매니저의 운영 규칙에 따라 애플리케이션 마스터 프로세스를 시작
- 맵리듀스 잡의 애플리케이션 마스터는 자바 애플리케이션, 메인 클래스는 MRAppMaster
- 애플리케이션 마스터
- 잡을 초기화할 때 잡의 진행 상태를 추적하기 위한 다수의 북키핑(장부) 객체 생성
- 각 태스크로부터 진행 및 종료 보고서를 받음
- 클라이언트가 계산한 입력 스플릿 정보를 공유 파일시스템에서 읽어옴
- 입력 스플릿별로 맵 태스크 객체를 생성
- Job의 setNumReduceTask ( ) 메서드로 지정한 mapreduce.job.reduces 속성의 값(리듀서 수)만큼 맵 태스크 객체를 생성 <- 각 태스크는 ID를 부여받음
- 애플리케이션 마스터는 맵리듀스 잡을 구성하는 태스크를 실행할 방법을 결정해야 함
- 잡의 크기가 작다면 -> 애플리케이션 마스터는 태스크를 자신의 JVM에서 실행할수도 있음 = 우버 되었다 / 우버 태스크로 실행된다
- 어느 정도 돼야 잡이 작다고 할 수 있을까? -> 작은 잡이란 10개 미만의 매퍼와 하나 의 리듀서, HDFS 블록 하나보다 작은 크기의 입력
- 태스크를 실행하기 전에 애플리케이션 마스터는 OutputCommitter의 setupjob() 메서드를 호출: 기본 클래스인 FileOutputCommitter는 잡의 최종 출력 디렉터리와 태스 크 출력을 위한 임시 작업 공간을 생성
7.1.3 태스크 할당
- 잡을우버 태스크로 실행하기 적합하지 않다면 애플리케이션 마스터는 리소스 매니저에 잡의 모든 맵과 리듀스 태스크를 위한 컨테이너를 요청
- 맵 태스크 요청이 먼저며 리듀스 태스크 요청보다 우선순위가 높음: 리듀스의 정렬 단계가 시작되기 전에 모든 맵 태스크가 완료 되어야 하기 때문
- 전체 맵 태스크의 5%가 완료되기 전까지 리 듀스 태스크의 요청은 처리되지 X
- 리듀스 태스크: 클러스터의 어느 곳에서도 실행 가능 / 맵 태스크 요청: 스케줄러가 최 대한 준수하는 데이터 지역성 제약 O
- 최적의 상황은 태스크가 데이터 로컬 일 때 = 입력 스플릿이 저장된 노드에서 맵 태스크가 실행되는 것
- 대안 랙 로컬 = 동일한 랙에 속한 노드에서 맵 태스 크가 실행되지만 입력 스플릿이 있는 노드는 아닐 때
- 개별 잡에 대한 잡 카운터를 살펴보면 지역성 수준별로 몇 개의 태스크가 실행되었는지 확인 가능
- 요청할 때 태스크를 위한 메모리 요구사항과 CPU 수를 명시: 기본적으로 1.024MB의 메모리와 가상코어 1 개를 각 맵과 리듀스 태스크에 할당
7.1.4 태스크 실행
- 리소스 매니저의 스케줄러가특정 노드 상의 컨테이너를 위한 리소스를 태스크에 할당 -> 애플리케이션 마스터는 노드 매니저와 통신하며 컨테이너를 시작
- 각태스크는 YarnChild 메인 클래스를 가진 자바 애플리케이션으로 실행
- 태스크를 실행하기 전에 필요한 리소스를 로컬로 가져와야 함
- YarnChild는 전용 JVM에서 실행: 사용자 정의 맵과 리듀스 함수에서 버그가 발생하여 강제 종료되거나 멈추어도 노드 매니저는 영향을 받지 않음
- 각 태스크는 태스크 자체와 동일한JVM에서 설정과 커밋 동작을 수행
- 파일 기반의 잡에서 커밋 동작은 임시 위치 에서 최종 위치로 태스크 출력을 옮김
- 커밋 프로토콜은 투기적 실행이 활성화되었을 때 중복 태스크 중 단 하나만 커밋하고 나머지는 버림
<스트리밍>
- 스트리밍: 사용자가 제공한 실행 파일을 시작하고 이와 통신하기 위한 목적을 가진 특별한 맵 과 리듀스 태스크를 실행
- 스트리밍 태스크: 표준 입출력 스트림을 통해 프로세스(어떤 언어로도 작성될 수 있음)와 통신
- 태스크가 실행되는 동안
- 자바 프로세스는 키-값 쌍을 외부 프로세스에 전달
- 이를 사용자 정의 맵과 리듀스 함수로 처리
- 출력 키-값 쌍을 자바 프로세스에 돌려줌
- 따라서 노드 매니저 관점에서는 자식 프로세스가 마치 맵과 리듀스 코드를 스스로 실행한 것처럼 보일 것
7.1.5 진행 상황과 상태 갱신
- 맵리듀스 잡은 수행 시간이 오래 걸리는 배치 잡: 상당한 시간이 걸리므로 사용자가 잡의 진행 상황에 대한 피드백을 받는 것은 매우 중요
- 잡과 개별 태스크는 상태 정보를 가짐: 실행 중/성공적으로 완료됨/실패와 같은 태스크의 상태, 맵과 리듀스의 진행 상황, 잡의 카운터 값, 상태 메시지 또는 명세 등
- 이러한 상태 정보는 잡의 진행 과정에서 수시로 변경됨 -> 이를 클라이언트에 전달할 방법은 무엇일까?
- 태스크가 수행되는 동안 태스크는 자신의 진행 상황을 추적
- 맵 태스크의 경우 = 처리한 입력 데이터의 비율 / 리듀스 태스크의 경우 = 리듀스가 처리한 입력 데이터의 비율을 추정
- 이를 위해 전체 진행 과정을 총 세 부분으로 나눔
<맵리듀스가 진행 중임을 판단하는 동작>
- 진행 상황을 항상 측정할 수 있는 것은 아니지만, 태스크가 무엇인가를 하고 있다는 것을 하둡에 알려주어야 함
- 하둡은 진행 중인 태스크는 실패로 보지 않기 때문에 진행 상황을 보고하는 것은 매우 중요
- 진행 상태로 판단되는 동작들: 입력 레코드 읽기 / 출력 레코드 쓰기 / 상태 명세의 설정 / 카운터 증가 / Reporter 또는 TaskAttempContext의 progress() 메서드 호출
- 태스크는 수행 중에 발생하는 다양한 이벤트를 세는 여러 카운터를 가짐: 맵의 출력 레코드 수를 세는 것과 같은 프레임워크에 내장된 카운터 나 사용자 정의 카운터가 있음
- 맵과 리듀스 태스크가 실행되면서 자식 프로세스는 부모인 애플리케이션 마스터와 밀접한 인터 페이스를 통해 통신
- 태스크는 진행 상황과 상태 정보를 집계하여 매 3초마다 이 인터페이스를 통해 애플리케이션 마스터에 보고
- 리소스 매니저 웹 UI는 실행 중인 모든 애플리케이션 각각의 애플리케이션 마스터 웹 UI 링크를 보여줌
- 잡이 진행되는 동안 클라이언트는 매초마다 애플리케이션 마스터를 폴링하여 가장 최근의 상태를 받음
7.1.6 잡 완료
- 애플리케이션 마스터가 마지막 태스크가 완료되었다는 통지를 받으면 -> 잡의 상태를 '성공'으로 변경
- 이제 잡이 상태 정보를 폴링하면 해당 잡이 성공적으로 완료되었음을 알게 되고, 사용자에게 통지할 메시지를 출력한 뒤 waitForCompletion ( ) 메서드가 반환됨
- 또한 HTTP 잡 통지를 보내도록 설정되어 있다면 애플리케이션 마스터는 이를 수행
- 마지막으로, 잡이 완료되면 애플리케이션 마스터와 태스크 컨테이너는 작업 상태를 정리하고 OutputCommitter의 commitJob ( ) 메서드를 호출
7.2 실패
- 실제 환경에서는 사용자 코드의 버그 때문에 프로세스가 강제로 죽거나 서버에 장애가 발생하는 일이 빈번
- 하둡의 가장 큰 장점은 이러한 실패를 잘 다루어 성공적으로 잡이 완료되도록 도와준다는 것
7.2.1 태스크실패
- 가장 흔한 실패의 유형은 맵 또는 리듀스 태스크 내 사용 자 코드에서 런타임 예외를 던질 때
- 예외가 발생하면
- 태스크 JVM은 종료하기 전에 부모인 애플리케이션 마스터에 에러를 보고
- 이 에러는 최종적으로 사용자 로그에 기록됨
- 애플리케이션 마스터는 이 태스크 시도를 실패로 표시하고 해당 리소스를 다른 태스크에서 사용 가 능하도록 컨테이너를 풀어줌
- 스트리밍 태스크에서는 스트리밍 프로세스가 0이 아닌 코드를 반환하면 실패로 표시
- 다른 실패 유형은 태스크 JVM이 갑작스럽게 종료하는 것: 아마도 맵리듀스 사용자 코드에 의해 드러난특정 상황으로 인해 JVM이 종료되는JVM 버그일 것
- 이때 노드 매니저는 프로세스가 종료되었음을 알게 되고, 이를 애플리케이션 마스터에 알려주어 해당 시도가 실패했다고 표시하게 함
- 행이 걸린(멈춘) 태스크의 처리
- 애플리케이션 마스터는 잠시 동안 진행 상황을 갱신받지 못함을 알게 되면 해당 태스크를 실패로 표시
- 태스크 JVM 프로세스는 이 기간 후에 자동으로 강제 종료
- 태스크를 실패로 간주하는 타임아웃 기간은 보통 10분 이고 잡 단위(또는 클러스터 단위)로 mapreduce.task.timeout 속성에 밀리초 단위의 값을 설정 가능
- 타임아웃을 0으로 설정하면: 타임아웃을 비활성화하며 따라서 실행 시간이 긴 태스크는 절대로 실패로 표시되지 않음
- 이러한 상황에서 행이 걸려 멈춘 태스크는 자신의 컨테이너를 결코 해 제하지 않을 것이며 시간이 지남에 따라 클러스터를 느리게 만드는 결과를 초래
- 따라서 이러한 접근 방법은 지양할 것, 태스크가 주기적으로 진행 상황을 확실히 보고하도록 하는 것이 좋음
- 애플리케이션 마스터는 태스크 시도 실패를 알게 되면 해당 태스크 실행을 다시 스케줄링
- 또한 태스크 시도는 강제로 종료될 수 있는데 이는 실패와 다름
- 태스크 시도는 투기적 중첩 때문에, 또는 실행 기반인 노드 매니저가 실패해서 애플리케이션 마스터가 해당 노드 매니저에서 실행되는 모든 태스크 시도를 실패로 표시할 때 강제 종료됨 -> 태스크 전체 시도 횟수에 포함되지 않음
7.2.2 애플리케이션 마스터 실패
- 맵리듀스 애플리케이션 마스터가 시도하는 최대 횟수는 mapreduce.am.max-attempts 속성으로 조절, 기본값은 2
- YARN은 클러스터에서 실행 중인 모든 YARN 애플리케이션 마스터에 대해 일괄적으로 대 시도 횟수의 제한을 줄 수 있음, 개별 애플리케이션은 이를 넘길 수 없음(기본값 2)
- 복구 작업 방식
- 애플리케이션 마스터는 주기적으로 리소스 매니저에 하트비트를 보냄
- 애플리케이션 마스터의 실패 이벤트 발생 시 리소스 매니저는 이 실패를 감지
- 새로운 컨테이너에서 실행할 새로운 마스터 인스턴스를 시작
- 맵리듀스 클라이언트는 진행 상황 보고를 위해 애플리케이션 마스터를 폴링 -> 만약 이 애플리케이션 마스터가 실패한다면 클라이언트는 새로운 애플리케이션 마스터 인스턴스의 위치를 알아내야 함
- 클라이언트는 상태 갱신을 전달할 때 타임아웃이 발생, 이때 클라이언트는 리소스 매니저에 새로운 애플리케이션 마스터의 주소를 요청
7.2.3 노드 매니저 실패
- 노드 매니저가 크래시에 의해 실패하거나 굉장히 느리게 수행 중이라면 리소스 매니저에 하 트비트 전송을 중단할 것
- 리소스 매니저는 하트비트 전 송을 중단한 노드 매니저가 10분동안 한번도 전송하지 않음을 인지하면 이를 컨테이너를 스케줄링하는 노드 풀에서 제거
- 실패한 노드 매니저에서 수행 중인 애플리케이션 마스터나 태스크는 앞의 두 절에서 기술한 메 커니즘을 통해 복구될 것
- 또한 애플리케이션 마스터는 해당 노드 매니저에서 성공적으로 실행된 맵 태스크가 완료되지 않은 잡에 속해 있다면 이를 재실행
- 애플리케이션 실패 횟수가 높으면 노드 매니저 자체가 실패하지 않았더라도 노드 매니저는 블랙 리스트에 등록됨
7.2.4 리소스 매니저 실패
- 리소스 매니저 실패는 굉장히 심각한 상황: 리소스 매니저 없이는 잡이나 태스크 컨테이너가 실행될 수 없기 때문
- 고가용성을 달성하기 위해서는 두 개의 리소스 매니 저를 활성대기 설정으로 실행해야 함
- 대기 리소스 매니저는 실패한 활성 리소스 매니저의 핵심 상태를 복구할 수 있음
- 노드 매니저 정보는 상태 저장소에 보관되지 않음: 노드 매니저가 첫 번째 하트비트를 전송할 때 새로운 리소스 매니저가 상대적으로 빠르게 재구축할 수 있기 때문
- 새로운 리소스 매니저가 시작되면 상태 저장소로부터 애플리케이션 정보를 읽고 클러스터에서 실행 중인 모든 애플리케이션의 애플리케이션 마스터를 재시작
- 대기 리소스 매니저에서 활성 리소스 매니저로의 전환은 장애극복 관리자가 담당
7.3 셔플과 정렬
- 맵리듀스는 모든 리듀서의 입력이 키를 기준으로 정렬되는 것을 확실히 보장
- 시스템이 이 러한 정렬을 수행하고 맵의 출력을 리듀서의 입력으로 전송하는 과정을 셔플이라고 함
7.3.1 맵 부분
- 맵 함수가 결과를 생산할 때 이를 단순히 디스크에 쓰지 않음
- 각 맵 태스크는 환형 구조의 메모리 버퍼를 가지고 있으며 이곳에 결과를 기록
- 버퍼의 내용이 특정 한계치에 도달하면 백그라운드 스레드가 디스크에 스필하기 시작
- 스필이 일어나 는 동안에도 맵 결과는 계속해서 버퍼에 쓰이는데 이때 버퍼가 가득 차게 되면 맵은 스필이 종료 될 때까지 블록됨
- 디스크로 쓰기 전에 스레드는 먼저 데이터를 최종적으로 전송할 리듀서 수에 맞게 파티션으로 나눔
- 각 파티션 내의 백그라운드 스레드는 키를 기준으로 인메모리 정렬을 수행
- 만약 컴바이너 함수가 존재하면 정렬의 출력에 대해 수행: 맵 출력을 더욱 축소하여 로컬 디스크에 쓰거나 리듀서에 전송할 데이터양을 줄임
- 메모리 버퍼가 스필 한계치에 도달 -> 새로운 스필 파일이 생성: 맵 태스크가 최종 결과 레코드를 쓰고 나면 여러 개의 스필 파일이 존재할 수 있음
- 최소 세 개의 스필 파일이 존재한다면: 출력 파일을 쓰기 전에 컴바이너가 다시 실행됨
- 하나 혹은 두 개의 스필만 존재한다면: 맵 출력 크기를 줄이는 양에 비해 컴바이너를 호출하는 오버헤드가 크기 때문에 이러한 맵 출력 에 대해 다시 실행하는 것은 가치가 없음
- 출력 파일의 파티션은 HTTP를 통해 리듀서에 전달됨
7.3.2 리듀스 부분
- 맵 출력 파일은 맵 태스크를 수행한 서버의 로컬 디 스크에 존재하는데, 이제는 이것이 특정 파티션에 대해 리듀스 태스크를 시작하려는 서버에서 필요한 상황
- 맵 태스크는 각기 다른 시간에 끝날 수 있으므로 리듀스 태스 크는 각 맵 태스크의 출력이 끝나는 즉시 복사하기 시작 = 리듀스 태스크의 '복사 단계'
<리듀서는 맵 출력을 인출할 서버를 어떻게 알까?>
- 맵 태스크가 성공적으로 종료되면 하트비트 전송 메커니즘을 통해 애플리케이션 마스터에 알려줌 -> 특정 잡에 대해 애플리케이션 마스터는 맵 출력과 호스트 사이의 매핑 정보를 알고 있음
- 맵 출력은 그 크기가 충분히 작다면 리듀스 태스크 JVM 메모리에 복사됨
- 복사된 파일이 디스크에 축적되면 백그라운드 스레드가 이를 더 크고 정렬된 형태의 파일로 병합 -> 이후 병합할 시간 절약
- 모든 맵 출력이 복사되는 시점에 리듀스 태스크는 정렬 단계로 이동하여 맵 출력을 병합하고 정렬 순서를 유지
- 리듀스 단계에서 리듀스 함수는 정렬된 출력 내의 각 키에 대해 호출됨
- 이 단계의 출력은 보통 HDFS와 같은 출력 파일시스템에 곧바로 써짐
7.3.3 설정 조정
- 맵리듀스 성능 향상을 위한 셔플 튜닝 방법
- 일반적인 원칙은 셔플에 가능한 한 많은 메모리를 할당하는 것
- 그러나 여기에는 트레이드 오프가 존재: 맵과 리듀스 함수가 동작하는 데 충분한 메모리 확보가 필요하기 때문
- 따라서 맵과 리듀스 함수를 작성할 때 가능한 한 적은 메모리를 사용하도록 하는 것이 최선책
- 맵 측면에서 보면 다수의 디스크 스필을 피하는 것이 최고의 성능을 내는 방법
- 리듀스 측면에서는 중간 데이터 전체가 메모리에 존재할 때 최고의 성능을 얻을 수 있음
- 기본적으로 이러한 일은 발생하지 않음: 일반적으로 리듀스 함수에 모든 메모리를 예약해두기 때문
7.4 태스크 실행
- 맵리듀스 사용자가 태스크 실행에 관해 취할 수 있는 좀 더 많은 제어사항에 대해서
7.4.1 태스크 실행 환경
- 하둡은 맵 또는 리듀스 태스크에 실행 환경에 관한 정보를 전달해줌
- 아래 표에 있는 속성은 이전 맵리듀스 API가 제공하는 Mapper 또는 Reducer의 configure() 메서드 구현을 통해 얻을 수 있는 잡의 환경 설정으로 접근 가능
<스트리밍 환경변수>
- 하둡은 잡 환경 설정 파라미터를 스트리밍 프로그램의 환경변수로 설정
- 이때 알파벳이나 숫자 이외의 문자는 밑줄(_)로 대체하여 유효한 이름으로 만듦
#파이썬 스트리밍 스크립트에서 mapreduce.job.id 속성 값을 얻어오는 방법 os.environ["mapreduce_job_id"] #MAGIC_PARAMETER 환경변수를 설정하는 예 -cmdenv MAGIC_PARAMETER=abracadabra
7.4.2 투기적 실행
- 맵리듀스 모델은 잡을 태스크로 나누고 태스크를 병렬 수행하는 형태
- 이로 인해 잡 실행 시간은 느리게 수행되는 태스크에 굉장히 민감: 단 하나의 느린 태스크가 전체 잡수행을상당히 지연시키기 때문
- 태스크는 하드웨어의 성능 저하나 소프트웨어의 잘못된 설정 등 다양한 이유로 느려질 수 있음
- 태스크의 투기적 실행: 하둡은 느린 태스크를 진단하거나 고치려 하지 않는 대신 태스크 수행이 예상했던 것보다 더 느린 상황을 감지하여 또 다른 동일한 예비 태스크를 실행
- 동시에 두 개의 복제 태스크를 실행하여 서로 경쟁하도록 하는 것이 투기적 실행이 아님을 명심할 것: 클러스터의 리소스 낭비가 심함
- 투기적 실행은 일종의 최적화며 더욱 안정적으로 잡을 실행하는 기능은 아님: 태스크 를 멈추거나 느리게 하는 버그가존재할 때 이러한 문제를 회피하기 위해 투기적 실행에 의존하는 것은 옳지 않음
- 투기적 실행과 관련된 속성
- 투기적 실행의 궁극적인 목적 = 잡 실행을 줄이는 것: 그러나 클러스터 효율성 측면에서 비용이 발생
- 혼잡한 클러스터에서 개별 잡의 실행 시간 을 줄이기 위해 투기적 실행으로 중복 태스크가 실행되면 전반적인 단위 시간당 산출물은 줄어듦
- 따라서 클러스터 관리자는클러스터 상에서 투기적 실행을 끄는 것을 선호
- 투기적 실행을 끄는 또 다른 이유: 비멱등 태스크 때문
7.4.3 출력 커미터
- 하둡 맵리듀스는 잡과 태스크가 깨끗하게 성공하거나 실패하도록 보장하기 위한 커밋 프로토콜을 사용
- setupOob ( ) 메서드는 잡을 실행하기 전에 호출되고 일반적으로 초기화를 수행하기 위해 사용
- 잡이 성공하면 commitJob ( ) 메서드가 호출, 기본적인 파일 기반 구현에서 임시 작업 공간을 삭제, _SUCCESS라는 빈 마커 파일을 생성 -> 파일시스템 클라이언트에 잡이 성공적 으로 완료되었음을 알림
- 태스크의 커밋 단계는 선택사항, needsTaskCommit ( )가 false를 반환하도록 하여 비활성화할 수 있음
- 프레임워크는 특정 태스크에 대한 여러 태스크 시도 이벤트 중에서 단 하나만을 커밋하며 나머지는 중단시킴
<태스크의 부차적인 파일>
- 맵과 리듀스 태스크의 출력을 작성하는 일반적인 방법은 키-값 쌍을 수집하는 Outputcollector 를 사용하는 것
- 주의해야 할 점은 동일 태스크의 다중 인스턴스가 동일한 파일에 쓰지 않도록 보장하는 것 -> OutputCommitter 프로토콜은 이러한 문제를 해결해 줌
- 태스크는 잡의 환경 설정에 mapreduce.task.output .dir 속성 값을 얻어 와서 작업 디렉터리를 찾을 수 있음