정화 코딩

[ML] Spam Classification via Naïve Bayes 본문

Machine Learnig

[ML] Spam Classification via Naïve Bayes

jungh150c 2024. 10. 16. 21:13

Data

hw2_MATRIX.TRAIN

DOC_WORD_MATRIX_TRAIN
2144 1448
abil absolut abus access accid accord account accur ...
0 6 1 25 1 41 1 40 1 ...
0 78 4 77 1 214 1 18 1 ...
0 26 1 149 1 197 1 37 2 ...
0 11 3 75 1 39 1 39 1 ...
...
1 12 1 1 2 9 2 19 1 ...
1 13 2 32 2 87 1 99 1 ...
1 3 1 6 1 2 1 17 2 ...
1 141 1 50 1 30 2 92 1 ...
...

첫 번째 줄 : 메타정보

두 번째 줄의 첫 번째 수 (2144) : 문서/이메일의 개수

두 번째 줄의 첫 번째 수 (1448) : 단어/토큰의 개수

세 번째 줄 : 단어/토큰 목록

각 행의 첫 번째 수 : 문서/이메일 레이블 (1: 스팸, 0: 비스팸)

i번째 행 : i번째 문서/이메일

j번째 열 : j번째 단어/토큰

(i, j) 항목 : i번째 문서에서 j번째 토큰이 나타난 횟수

trainMatrix, tokenlist, trainCategory = readMatrix('./data/hw2_MATRIX.TRAIN')
  • trainMatrix: 문서-단어 행렬. 각 행은 하나의 문서를, 각 열은 특정 토큰을 나타내며, 값은 해당 문서에서 해당 토큰이 등장한 빈도를 의미함.
  • tokenlist: 토큰 목록. 각 토큰의 이름이 나열된 배열.
  • trainCategory: 문서의 레이블 배열. 각 문서가 스팸(1)인지 비스팸(0)인지 나타내는 값들로 구성된 배열.

1. Naïve Bayes 분류기를 구현하여 스팸 분류

import matplotlib.pyplot as plt
import numpy as np

def readMatrix(file):
    fd = open(file, 'r')
    hdr = fd.readline()
    rows, cols = [int(s) for s in fd.readline().strip().split()]
    tokens = fd.readline().strip().split()
    matrix = np.zeros((rows, cols))
    Y = []
    for i, line in enumerate(fd):
        nums = [int(x) for x in line.strip().split()]
        Y.append(nums[0])
        kv = np.array(nums[1:])
        k = np.cumsum(kv[:-1:2])
        v = kv[1::2]
        matrix[i, k] = v
    return matrix, tokens, np.array(Y)

def nb_train(matrix, category):
    state = {} # 모든 학습 파라미터들을 저장하는 딕셔너리
    numDocs = matrix.shape[0] # 문서의 개수
    numTokens = matrix.shape[1] # 토큰의 개수

    numSpam = np.sum(category == 1) # category가 1인 요소의 개수 (스팸 문서 수)
    numHam = np.sum(category == 0) # category가 0인 요소의 개수 (비스팸 문서 수)

    spamTokenCounts = matrix[category == 1].sum(axis = 0) # 스팸 문서들에서 등장한 각 토큰의 총 빈도수 (벡터)
    hamTokenCounts = matrix[category == 0].sum(axis = 0) # 비스팸 문서들에서 등장한 각 토큰의 총 빈도수 (벡터)

    totalSpamTokens = spamTokenCounts.sum() # 스팸 문서들에서 등장한 모든 토큰의 총 빈도수 (스칼라)
    totalHamTokens = hamTokenCounts.sum() # 비스팸 문서들에서 등장한 모든 토큰의 총 빈도수 (스칼라)
    
    # 로그를 활용하여 언더플로우 방지
    
    # 문서가 스팸일 확률
    state['spamProb'] = np.log(numSpam + 1) - np.log(numDocs + numTokens)
    # 문서가 비스팸일 확률
    state['hamProb'] = np.log(numHam + 1) - np.log(numDocs + numTokens)

    # 스팸 문서일 때 각 토큰이 등장할 확률
    state['spamTokenLogProb'] = np.log(spamTokenCounts + 1) - np.log(totalSpamTokens + numTokens)
    # 비스팸 문서일 때 각 토큰이 등장할 확률
    state['hamTokenLogProb'] = np.log(hamTokenCounts + 1) - np.log(totalHamTokens + numTokens)

    return state

def nb_test(matrix, state):
    # 문서의 개수만큼 배열을 0으로 초기화하여 생성
    output = np.zeros(matrix.shape[0])

    # @: 행렬 곱셈 연산 -> 각 문서의 모든 토큰에 대해 로그 확률을 가중합하여 계산
    # +: 확률이 로그로 표현되어 있으니 사실상 곱

    # 각 문서에 대해 스팸일 확률 (벡터)
    spamScores = matrix @ state['spamTokenLogProb'] + state['spamProb']
    # 각 문서에 대해 비스팸일 확률 (벡터)
    hamScores = matrix @ state['hamTokenLogProb'] + state['hamProb']
    
    # 각 문서에 대해 스팸(1) 또는 비스팸(0) 예측
    output = (spamScores > hamScores).astype(int)

    return output

def evaluate(output, label):
    error = (output != label).sum() * 1. / len(output)
    print('Error: %1.4f'%error)

def main():
    trainMatrix, tokenlist, trainCategory = readMatrix('./data/hw2_MATRIX.TRAIN')
    testMatrix, tokenlist, testCategory = readMatrix('./data/hw2_MATRIX.TEST')
    
    state = nb_train(trainMatrix, trainCategory)
    output = nb_test(testMatrix, state)

    evaluate(output, testCategory)
    return

if __name__ == '__main__':
    main()

2. 스팸 분류에 중요한 상위 5개 토큰 찾기

import matplotlib.pyplot as plt
import numpy as np

def readMatrix(file):
    fd = open(file, 'r')
    hdr = fd.readline()
    rows, cols = [int(s) for s in fd.readline().strip().split()]
    tokens = fd.readline().strip().split()
    matrix = np.zeros((rows, cols))
    Y = []
    for i, line in enumerate(fd):
        nums = [int(x) for x in line.strip().split()]
        Y.append(nums[0])
        kv = np.array(nums[1:])
        k = np.cumsum(kv[:-1:2])
        v = kv[1::2]
        matrix[i, k] = v
    return matrix, tokens, np.array(Y)

def nb_train(matrix, category):
    state = {} # 모든 학습 파라미터들을 저장하는 딕셔너리
    numDocs = matrix.shape[0] # 문서의 개수
    numTokens = matrix.shape[1] # 토큰의 개수

    numSpam = np.sum(category == 1) # category가 1인 요소의 개수 (스팸 문서 수)
    numHam = np.sum(category == 0) # category가 0인 요소의 개수 (비스팸 문서 수)

    spamTokenCounts = matrix[category == 1].sum(axis = 0) # 스팸 문서들에서 등장한 각 토큰의 총 빈도수 (벡터)
    hamTokenCounts = matrix[category == 0].sum(axis = 0) # 비스팸 문서들에서 등장한 각 토큰의 총 빈도수 (벡터)

    totalSpamTokens = spamTokenCounts.sum() # 스팸 문서들에서 등장한 모든 토큰의 총 빈도수 (스칼라)
    totalHamTokens = hamTokenCounts.sum() # 비스팸 문서들에서 등장한 모든 토큰의 총 빈도수 (스칼라)
    
    # 로그를 활용하여 언더플로우 방지
    
    # 문서가 스팸일 확률
    state['spamProb'] = np.log(numSpam + 1) - np.log(numDocs + numTokens)
    # 문서가 비스팸일 확률
    state['hamProb'] = np.log(numHam + 1) - np.log(numDocs + numTokens)

    # 스팸 문서일 때 각 토큰이 등장할 확률
    state['spamTokenLogProb'] = np.log(spamTokenCounts + 1) - np.log(totalSpamTokens + numTokens)
    # 비스팸 문서일 때 각 토큰이 등장할 확률
    state['hamTokenLogProb'] = np.log(hamTokenCounts + 1) - np.log(totalHamTokens + numTokens)

    return state

def find_top_spam_indicative_tokens(state, tokenlist, n):
    # 각 토큰이 스팸 클래스에 얼마나 중요한지 계산
    logRatio = state['spamTokenLogProb'] - state['hamTokenLogProb']
    # logRatio에서 상위 5개의 인덱스
    topIndices = np.argsort(logRatio)[-n:][::-1]

    # 상위 5개 토큰과 해당 log_ratio 값 출력
    for index in topIndices:
        print(f"Token: {tokenlist[index]}, Log ratio: {logRatio[index]}")

def main():
    trainMatrix, tokenlist, trainCategory = readMatrix('./data/hw2_MATRIX.TRAIN')
    
    state = nb_train(trainMatrix, trainCategory)

    find_top_spam_indicative_tokens(state, tokenlist, 5)
    return

if __name__ == '__main__':
    main()

3. 훈련 세트 크기를 변화시키며 테스트 오류율 학습 곡선 작성

import matplotlib.pyplot as plt
import numpy as np

def readMatrix(file):
    fd = open(file, 'r')
    hdr = fd.readline()
    rows, cols = [int(s) for s in fd.readline().strip().split()]
    tokens = fd.readline().strip().split()
    matrix = np.zeros((rows, cols))
    Y = []
    for i, line in enumerate(fd):
        nums = [int(x) for x in line.strip().split()]
        Y.append(nums[0])
        kv = np.array(nums[1:])
        k = np.cumsum(kv[:-1:2])
        v = kv[1::2]
        matrix[i, k] = v
    return matrix, tokens, np.array(Y)

def nb_train(matrix, category):
    state = {} # 모든 학습 파라미터들을 저장하는 딕셔너리
    numDocs = matrix.shape[0] # 문서의 개수
    numTokens = matrix.shape[1] # 토큰의 개수

    numSpam = np.sum(category == 1) # category가 1인 요소의 개수 (스팸 문서 수)
    numHam = np.sum(category == 0) # category가 0인 요소의 개수 (비스팸 문서 수)

    spamTokenCounts = matrix[category == 1].sum(axis = 0) # 스팸 문서들에서 등장한 각 토큰의 총 빈도수 (벡터)
    hamTokenCounts = matrix[category == 0].sum(axis = 0) # 비스팸 문서들에서 등장한 각 토큰의 총 빈도수 (벡터)

    totalSpamTokens = spamTokenCounts.sum() # 스팸 문서들에서 등장한 모든 토큰의 총 빈도수 (스칼라)
    totalHamTokens = hamTokenCounts.sum() # 비스팸 문서들에서 등장한 모든 토큰의 총 빈도수 (스칼라)
    
    # 로그를 활용하여 언더플로우 방지
    
    # 문서가 스팸일 확률
    state['spamProb'] = np.log(numSpam + 1) - np.log(numDocs + numTokens)
    # 문서가 비스팸일 확률
    state['hamProb'] = np.log(numHam + 1) - np.log(numDocs + numTokens)

    # 스팸 문서일 때 각 토큰이 등장할 확률
    state['spamTokenLogProb'] = np.log(spamTokenCounts + 1) - np.log(totalSpamTokens + numTokens)
    # 비스팸 문서일 때 각 토큰이 등장할 확률
    state['hamTokenLogProb'] = np.log(hamTokenCounts + 1) - np.log(totalHamTokens + numTokens)

    return state

def nb_test(matrix, state):
    # 문서의 개수만큼 배열을 0으로 초기화하여 생성
    output = np.zeros(matrix.shape[0])

    # @: 행렬 곱셈 연산 -> 각 문서의 모든 토큰에 대해 로그 확률을 가중합하여 계산
    # +: 확률이 로그로 표현되어 있으니 사실상 곱

    # 각 문서에 대해 스팸일 확률 (벡터)
    spamScores = matrix @ state['spamTokenLogProb'] + state['spamProb']
    # 각 문서에 대해 비스팸일 확률 (벡터)
    hamScores = matrix @ state['hamTokenLogProb'] + state['hamProb']
    
    # 각 문서에 대해 스팸(1) 또는 비스팸(0) 예측
    output = (spamScores > hamScores).astype(int)

    return output

def evaluate(output, label):
    error = (output != label).sum() * 1. / len(output)
    return error

def main():    
    # 훈련 세트 크기 리스트
    train_sizes = [50, 100, 200, 400, 800, 1400]
    test_errors = []
    
    # 각 훈련 세트 크기에 대해 학습 및 테스트
    for size in train_sizes:
        # 훈련 데이터 파일 경로 설정
        trainMatrix, tokenlist, trainCategory = readMatrix(f'./data/hw2_MATRIX.TRAIN.{size}')
        testMatrix, tokenlist, testCategory = readMatrix('./data/hw2_MATRIX.TEST')
        
        state = nb_train(trainMatrix, trainCategory)
        output = nb_test(testMatrix, state)
        
        # 테스트 오류율 계산 및 저장
        error = evaluate(output, testCategory)
        test_errors.append(error)
        print(f"Training size: {size}, Test error: {error:.4f}")
    
    # 학습 곡선 그리기
    plt.plot(train_sizes, test_errors, marker='o')
    plt.xlabel('Training Set Size')
    plt.ylabel('Test Set Error')
    plt.title('Learning Curve (Test Set Error vs. Training Set Size)')
    plt.show()

    # 최적의 훈련 세트 크기 출력
    best_size = train_sizes[np.argmin(test_errors)]
    print(f"Best training set size for lowest test set error: {best_size}")
    
    return

if __name__ == '__main__':
    main()

실행 전 해야할 일

pip install matplotlib

matplotlib 모듈을 설치한다.

  • matplotlib: 그래프를 그리기 위해 필요한 모듈

 

Comments