본문 바로가기
Software Science

[Spark] Learning Spark

by 101Architect 2018. 7. 1.


도서 추천 

Learning Python, Dive into Python

Machine Learning for Hackers, Doing Data Science 


conf 디렉터리에 log4j.properties 파일을 만든다. 스파크 개발자들은 이미 이 파일의 예제 파일인 log4j.properties.template 파일을 넣어 놓았다. 로그를 적게 출력하기 위해 일단 conf/log4j.properties.template 파일을 conf/log4j.properties 로 카피하고 다름 라인을 찾아 간다. 

log4j.rootCategory=INFO, console

그리고 경고 메시지만 보이도록 위 라인을 다음과 같이 수정하여 로깅 레벨을 낮춘다. 

log4j.rootCategory=WARN, console


스파크는 한번에 모든 트랜스포메이션끼리의 연계를 파악한다면 결과 도출에 필요한 데이터만을 연산하는 것이 가능하다. 사실 first() 액션에서도 스파크는 처음 일치하는 라인이 나올때까지만 파일을 읽을 뿐 전체 파일을 읽거나 하지는 않는다. 

스파크의 RDD 들은 기본적으로 액션이 실행될 때마다 매번 새로 연산을 한다. 여러 액션에서 RDD 하나를 재사용 하고 싶으면 스파크에게 RDD.persist()를 사용하여 계속 결과를 유지하도록 요청할 수 있다. 


 RDD를 만드는 가장 간단한 방법은 당신이 만든 프로그램에 있는 데이터 세트를 가져다가 SparkContext 의 parallelize() 메소드에 넘겨주는 것이다. 이 방식은 셸에서 바로 자신만의 RDD 를 만들 수 있고 연산을 수행할 수 있으므로 스파크를 배우는 단계에서 특히 유용하다. 하지만 이 방식은 하나의 머신 메모리에 모든 데이터 세트를 담고 있으므로 프로토타이핑이나 테스트 목적이 아니라면 널리 쓰지 않는다는 것을 명심해야 한다. 


여유로운 수행이란 RDD에 대한 트랜스포메이션을 호출할 때 그 연산이 즉시 수행되는 것이 아니라는 뜻이다. ... 그러므로 sc.textFile() 을 호출했을 떄 실제로 필요한 시점이 되기 전까지는 로딩되지 않는다. 트랜스포메이션처럼 그런 연산들도 여러번 호출 될 수 있다. 


필드 참조가 포함된 함수 전달하기 (이러면 안됨!)

.....

def getMathchesFunctionReference(self, rdd):

# 문제: "self.isMatch" 를 전달하기 위해서는 "self"도 전달해야 한다. "self" 의 모든것을 참조하게 된다. 

return rdd.filter(self.isMatch)


flatMap() 과 map() 의 차이 RDD1 : {"ab df", "dd bb" }

mappedRDD { ["ab", "df"], ["dd", "bb"] } 

flatMappedRDD { "ab", "df", "dd", "bb" } 

펼쳐 놓는다고 생각하면 쉽다. 


만일 중복이 없는 데이터 세트를 원한다면 RDD.disticnt() 트랜스포메이션을 써서 오직 단일 데이터 요소만 포함한 새로운 RDD 를 얻을 수 있다. 하지만 distinct() 는 단일 아니템인지를 비교하기 위해 네트워크로 데이터가 이리저리 전송되어야 하므로 연산이 비싼 작업이란 점을 알아두자. 이런 shuffling 작업과 이를 피하는 방법도 있다. 


fold(), reduce() 는 각 파티션의 초기 호출에 쓰이는 "제로 밸류 zero value" 를 인자로 받는다. 이 값은 여러번 적용해도 값이 바뀌지 않아야 한다. (예: 더하기를 위한 영값은 0, 곱하기에 대한 값은 1, 리스트 연결 연산이면 빈리스트)


새로운 데이터라면 combineByKey() 는 우리가 넘겨준 createCombiner() 함수를 써서 해당 키에 대한 어큐물레이터(accumulator 계산기의 누산기를 의미하기도 하는데 스파크에서는 계속 관리될 필요가 있는 값 현재까지합,.. 을 저장해 두는 공유변수를 뜻한다.) 의 초깃값을 만든다. 중요한 점은 이 작업을 RDD 전체 기준으로 첫 키가 나올 떄가 아니라 각 파티션에서 처음 나오는 키마다 실행한다는 것이다. 


병렬화 수준 최적화 

모든 RDD 는 고정된 개수의 파티션을 갖고 있으며 이것은 RDD 에서 연산이 처리될 떄 동시 작업의 수준을 결정하기 된다.

집합 연산이나 그룹화 작업 코드 범위의 바깥에서 RDD 의 파티셔닝을 바꾸고 싶을 떄도 있다. 이런 경우 repartition() 이라는 함수를 제공하는 이를 쓰면 새로운 파티션 구성을 위해 네트워크로 데이터 교환이 일어난다.(셔플링) 즉 나름 비용이 큰 작업이라는 점이다. 


groupBy() 는 쌍을 이루지 않았거나 현재 키와 관련되지 않은 다른 조건을 써서 데이터를 그룹화하고자 하는 경우에 쓰인다. 이 함수는 원본 RDD 의 모든 데이터에 적용하는 함수를 인자로 받아 그 결과를 키로 사용한다. 

groupByKey() 와 reduce()/fold() 를 써서 코딩을 했다면 아마도 키별 집합 연산 함수를 써서 동일한 결과를 더 효율적으로 얻을 수도 있다. 이는 메모리에 있는 값에 RDD 를 합치는 대신 키별로 데이터를 합치고 키 별로 합쳐진 값의 RDD 를 받는다. 예를 들면 rdd.reduceByKey(func) 는 rdd.groupByKey().mapValues(value => value.reduece(func)) 와 동일한 RDD 를 생성하지만, 각 키에대한 값의 리스트를 만드는 과정이 없으므로 훨씬 효율적이다. 


데이터 파티셔닝

노드간 데이터 세트의 파티셔닝을 어떻게 제어할 것인가 

RDD 가 단 한번만 스캐닝 된다면 굳이 사전에 파티셔닝 할 이유도 없다. 파티셔닝은 조인 같은 키 중심의 연산에서 데이터 세트가 여러번 재활용 될때만 의미가 있다. 

모든 RDD 의 키/값 쌍에 대해 .. 어떤 키의 모음들이 임의의 노드에 함께 모여 있게 되는것을 보장해준다. 


partitionBy() 통해 스파크는 이미 userData 가 해시 파티션되어 있음을 알고 이 정보를 최대한 활용할 것이다. 

userData.joing(event) 를 호출할떄 스파크는 오직 events RDD 만 셔플해서 이벤트 데이터를 각가의 UserID 와 맞는 userData 해시 파티션이 있는 머신으로 전송할 것이다. 결과는 매우 적은 네트워크 비용을 쓰지만 속도는 매우 빨라진다. 


partitionBy() 는 트랜스포메이션임을 주의하자. 항상 새로운 RDD 를 리턴하며 기존 RDD 를 변경하지 않는다. 

결과를 영속화하고 UserData 로 저장해야 한다. 

partitionBy() 에 전달되는 100은 파티션 개수를 나타내는데 이는 얼마나 많은 병렬 작업들이 RDD 에서 이후에 이뤄질 연산작업(조인등...) 을 수행하는지 나타낸다. 일반적으로 최소 클러스터의 전체 cpu 코어 개수 이상이 되도록 한다. 

영속화를 하지 않으면 매번 네트워크 통신에 의해 셔플링이 반복되어 partitionBy() 얻는 이득이 사라진다. 



데이터 불러오기/저장하기 

- 파일포맷과 파일시스템

- 스파크 sql 을 사용한 구조화된 데이터 

- 데이터베이스와 키/값 저장소 


텍스트파일 

파티션 개수를 조절하고 싶다면 minPartitions 를 지정해줄수 잇다. 

어느 파일에서 왔는지 알아야 한다면 (시간별 데이터가 키와 함께 있는 경우) 모른다면 한번에 파일 하나씩 처리해야 할 것이다. 


json 

라인별로 하나의 json 레코드를 갖고 있다고 가정하고 동작한다. 만약 여러 라인으로 이뤄지는 json 데이터 파일이라면 대신 전체 파일을 불러와서 각 파일별로 파싱해야 한다. 

mapPartitions() 를 써서 파서를 재사용 할 수도 있다. 

json 처럼 느슨한 구조의 데이터에서는 잘못 포매팅된 레코드를 다루는 것이 큼 문제를 일으킬 수 있다. 


시퀀스 파일은 키/값 쌍의 비중첩 파일로 구성된 인기 있는 하둡의 파일 포맷이다. 

스파크가 여러개의 노드에서 병렬로 시퀀스 파일들을 효율적으로 읽을 수 잇게 해준다. 


어큐물레이터는 저보들을 누산해 주며 브로드캐스트 변수는 효과적으로 많은 값들을 분산시켜준다. 


어큐물레이터 

스파크의 map() 이나 조건 지정을 위해 filter() 에 함수를 넘길때 보통 그것들의 바깥에 있는 드라이버 프로그램에 정의도니 변수를 사용 할 수 있다. 하지만 클러스터에서 실행중인 각가의 작업들은 변수의 복사본을 받아 작업하게 되므로 업데이트된 내용이 다시 드라이버 프로그램으로 돌아오지 않는다. 

일반적인 통신 타입인 결과의 집합연산과 브로드캐스팅에 대해서 어큐물레이터와 브로드캐스트변수가 제한을 풀어준다. 


작업노드이 태스크는 어큐물레이터의 value(), 에 접근할수 없다. 이 태스크의 관점에서는 '쓰기 전용' 변수인 셈이다. 


액션에서 사용되었던 어큐물레이터들에 대한것이며, 각 태스크의 업데이트는 스파크에 의해 각 어큐물레이터에 한번씩만 반영된다. 장애나 반복 연산의 횟수와 관계 업시 절재적으로 믿을만한 값을 얻기를 원하면 foreach() 같은 액션 안에 넣어야 한다. 

액션이 아닌 RDD 트랜스포메이션에 사용되는 어큐뮬레이터에 대해서는 이런 보장을 할수가 없다. 


브로드캐스트 변수 

스파크 연산에 쓸 크고 일기 전용인 값을 모든 작업 노드에 효과적으로 전송하는데 쓴다. 

직접짠 애플리케이션이 읽기 전용인 큰 사이즈의 검색 테이블을 모든 모드에서 사용할 필요가 있다면, 혹은 머신러닝 알고리즘에서 큰 사이즈의 벡터가 작업노드에서 필요하다면..


파티션별로 작업하기 

파티션별로 작업하는 것은 각 데이터 아이템에 대한 셋업 절차의 반복을 피하게 해준다. 각 아이템마다 반복될 여지가 있는 셋업의 예가 될 수 있다. (디비연결 등) 파티션들에서 한번만 코드를 실행하게 해줌으로써 그런작업들에 대한 비용을 줄여준다. 


스파크 실행구조 

분산모드에서 스파크느 ㄴ하나의 중앙 조정자(coordinator) 와 여러개의 분산 작업 노드로 구선되는 마스터/슬레이브 구조를 사용한다. 

중앙 조정자 = 드라이버

드라이버는 익스큐터라고 불리느 ㄴ다수의 분산 작업자들과 통신한다. 드라이버는 자신만의 자바 프로세스에서 돌아가며 각 익스큐터 또한 독립된 자바 프로세스 이다. 하나의 드라이버와 익스큐터들을 합쳐서 스파크 애플리케이션이라고 부른다. 


클러스터에서 스파크 애플리캐이션을 실행할때 발생하는 단계

1. 사용자는 spark-submit 을 사용하여 어플리케이션을 제출

2. spark-submit 은 드라이버 스포그램을 실행하고 사용자가 정의한 main() 메소드를 호출

3. 드라이버프로그램은 클러스터 매니저에게 익스큐터 실행을 위한 리소스를 요청

4. 클러스터 매니저는 드라이버 프로그램을 대신해 익스큐터들을 실행

5. 드라이버 프로세스가 사용자 애플리케이션을 통해 실행. 프로그램에 작성된 RDD 의 트랜스포케이션과 액션에 기반하여 드라이버는 작업내역을 단위작업형태로 나눠 익스큐터들에게 보냄

6. 단위 작업들은 결과를 계산하고 저장하기 위해 익스큐어에 의해 실행

7. 드라이버의 main() 이 끝났거나 sparkcontext.stop() 이 호출된다면 익스큐터들은 중지되고 클러스터 매니저에 사용했던 자원을 반환한다. 



의존성 그래프를 포함하는 하나의 큰 단일 jar 를 만들어 주는 빌드도구 uber jar 혹은 assembly jar 

메이븐 빌드한 스파크 앱 패키징

mvn pakage 결과 데렉터리에서 우버 jar 와 원래의 패키징 jar 를 볼수 있다. 

jar tf target/.....jar


단독 클러스터 매니저 

클러스터에서 애플리케이션을 실행하는 간단한 방법을 제공한다. 하나의 마스와 여러개의 worker 로 구성되며 각각은 설정에 따른 용량의 메모리와 cpu 코어 개수만큼을 사용한다. 

자원 사용량 설정 

스파크를 여러애플리케이션이 공유해서 사용할때 익스큐터들 사이에 자원을 얼마나 할지 결정할 필요가 있다. 단독 클러스트 매니저는 기본 스케줄링 정책으로 여러개가 동시에 돌수 있도록 애플리케이션 마다 사용량을 한계를 지정해 줄 수 있다. 


익스큐터 메모리 

코어수의 최대 총합 


메모리 영역을 조정하는 것 외에도 어떤 작업 패턴에 대해서는 스파크의 기본 캐싱 동작의 요소들을 개선할수도 있다. 스파크의 기본 cache() 연산은 memory_only 레벨로 메모리에 데이터를 저장한다. 새로운 rdd 파티션을 저장할 메모리가 모자라면 오래된 것을 간단히 삭제 될 수있고 나중에 삭제된 데이터가 다시 필요해지면 간단하게 로컬 저장장치에서 메모리로 읽어 드릴수있다. 


반응형