[SKKU DT] 73일차 -MySQL 데이터베이스, FastAPI

2024. 2. 16. 18:34SKKU DT

728x90
반응형

어제에 이어서, createTable로 text.contents 테이블 만드는 코드이다.

import pymysql # orm 아님.
from dotenv import load_dotenv
import os

load_dotenv(verbose=True)
DB_HOST=os.getenv('DB_HOST')
DB_USERNAME=os.getenv('DB_USERNAME')
DB_PASSWORD=os.getenv('DB_PASSWORD')
DB_NAME=os.getenv('DB_NAME')
DB_PORT=os.getenv('DB_PORT')

connection = pymysql.connect(host=DB_HOST,
                             user=DB_USERNAME,
                             password=DB_PASSWORD,
                             database=DB_NAME,
                             port=int(DB_PORT),
                             cursorclass=pymysql.cursors.DictCursor)

try:
    with connection.cursor() as cursor:
        # brand 테이블 생성
        # sql = '''
        #     CREATE TABLE
        #         IF NOT EXISTS brand (id INT AUTO_INCREMENT PRIMARY KEY,
        #         name VARCHAR(255))
        # '''

        sql = '''
            CREATE TABLE
                IF NOT EXISTS contents (
                    id INT AUTO_INCREMENT PRIMARY KEY,
                    brandId INT,
                    image longText, 
                    text longText, 
                    `like` VARCHAR(45), 
                    date VARCHAR(45),
                    FOREIGN KEY(brandId) REFERENCES brand(id)
                )
        '''
        cursor.execute(sql)
    connection.commit()
finally:
    connection.close()

실행하면 contents 이름의 테이블이 만들어진다.

 

 

이젠 인스타그램 크롤링 코드에 데이터베이스를 합친 코드를 이용해서 긁어온 정보를 데이터베이스에 저장해볼 것이다.

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
import time
from dotenv import load_dotenv # pip install python-dotenv
import os
from selenium.common.exceptions import NoSuchElementException
import pandas as pd
import pymysql # orm 아님.
import math

load_dotenv(verbose=True)

DB_HOST=os.getenv('DB_HOST')
DB_USERNAME=os.getenv('DB_USERNAME')
DB_PASSWORD=os.getenv('DB_PASSWORD')
DB_NAME=os.getenv('DB_NAME')
DB_PORT=os.getenv('DB_PORT')
INSTAGRAM_ID = os.getenv('INSTAGRAM_ID')
INSTAGRAM_PASSWORD = os.getenv('INSTAGRAM_PASSWORD')

selector = {
    'id_input':'._aa4b._add6._ac4d._ap35',
    'first_post': '._ac7v.xzboxd6.xras4av.xgc1b0m div a ._aagu > ._aagv + ._aagw',
    'next_btn': '._aaqg._aaqh > ._abl-',
    'cover': '._ac7v.xzboxd6.xras4av.xgc1b0m div ._aagu ._aagv img',
    'text': 'h1._ap3a._aaco._aacu._aacx._aad7._aade',
    'like': 'span a span .html-span.xdj266r.x11i5rnm.xat24cr.x1mh8g0r.xexx8yu.x4uap5.x18d9i69.xkhd6sd.x1hl2dhg.x16tdsg8.x1vvkbs',
    'date': '._aaqe'
}

brands = ['nike', 'chanelofficial', 'hermes', 'adidas', 'gucci']

options = webdriver.ChromeOptions()
options.add_argument('User-Agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36')
driver = webdriver.Chrome()

driver.get('https://instagram.com')
driver.implicitly_wait(10)
driver.maximize_window()

def click_nxt():
    next_btn = driver.find_element(By.CSS_SELECTOR, selector['next_btn'])
    next_btn.click()
    driver.implicitly_wait(10)
    
# 인스타그램 로그인하기
el = driver.find_elements(By.CSS_SELECTOR, selector['id_input']) # ID input태그 선택하기
el[0].send_keys(INSTAGRAM_ID)
el[1].send_keys(INSTAGRAM_PASSWORD)
el[1].send_keys(Keys.ENTER)
time.sleep(10)

text = []
image = []
like = []
date = []
brand = []

data = {
    'text': text, 
    'image': image, 
    'like': like, 
    'date': date,
    'brand': brand
    }
LOOP = 10

# 의류브랜드 데이터 수집
for idx, b in enumerate(brands):
    driver.get(f'https://instagram.com/{b}/')
    driver.implicitly_wait(10)

    images = []
    for i in range(LOOP):
        cover = driver.find_elements(By.CSS_SELECTOR, selector['cover'])[i]
        images.append(cover.get_attribute('src'))
    image = [*image, *images] # '*' 전개연산자, a= [1,2,3], b = [4,5,6] => c = [*a, *b] => c = [1,2,3,4,5,6]
    post = driver.find_elements(By.CSS_SELECTOR, selector['first_post'])[0]
    post.click()
    driver.implicitly_wait(10)

    for i in range(LOOP):
        try:
            text.append(driver.find_element(By.CSS_SELECTOR, selector['text']).text)
        except NoSuchElementException: #본문이 없는 게시물 예외 처리
            text.append('')
        count = driver.find_element(By.CSS_SELECTOR, selector['like']).text
        if count == '':
            time.sleep(1)
        like.append(driver.find_element(By.CSS_SELECTOR, selector['like']).text)
        date.append(driver.find_element(By.CSS_SELECTOR, selector['date']).get_attribute('title'))

        if i < 9:
            next_btn = driver.find_element(By.CSS_SELECTOR, selector['next_btn'])
            next_btn.click()
            driver.implicitly_wait(10)
        brand.append(brands[idx])

data.update({'image': image})

df = pd.DataFrame(data)
# df.set_index(keys=['date'], inplace=True, drop=True)
df.to_excel('./instagram.xlsx')

print(df)
time.sleep(10)

# brand 테이블에 name 삽입
# contents 테이블에 brandId, image, text, like, date 삽입

conn = pymysql.connect(host=DB_HOST,
                             user=DB_USERNAME,
                             password=DB_PASSWORD,
                             database=DB_NAME,
                             port=int(DB_PORT),
                             cursorclass=pymysql.cursors.DictCursor)

try:
    cursor = conn.cursor()

    sql = '''
        INSERT INTO brand (name)
        VALUES (%s)
    '''

    cursor.executemany(sql, brands)

    conn.commit()

except pymysql.err.DatabaseError as e:
    print(e)
finally:
    cursor.close()

try:
    cursor = conn.cursor()

    sql = '''
        SELECT id, name FROM brand WHERE name IN (%s, %s, %s, %s, %s)
    '''
    cursor.execute(sql, brands)
    results = cursor.fetchall()

    contents_list = [list(x) for x in df.to_numpy()] # [(image, text, like, date, brandName)]

    for i in range(LOOP * len(brands)):
        id = (result for result in results if result['name'] == contents_list[i][4])
        brandDict = next(id, False) #{'id':1, 'name': 'nike'}
        contents_list[i].pop()
        contents_list[i].append(brandDict['id'])

    sql = '''
        INSERT INTO contents (text, image, `like`, date, brandId)
        VALUES (%s, %s, %s, %s, %s)
    '''
    
    cursor.executemany(sql, contents_list)
    conn.commit()

except pymysql.err.DatabaseError as e:
    print(e)
finally:
    cursor.close()

 

 

brand 테이블의 name은 UQ(unique)를 체크한다.

 

 

짠하고 나와야되는데 오류가 났네

 

 


 

 

송금 시스템

-A가 B에게 이체했는데 찰나의 순간에 네트워크 오류로 B가 입금을 받지 못하는 경우를 해결하기 위해서 transaction을 활용한다.

import pymysql

conn = pymysql.connect(
    host='localhost',
    user='your_username',
    password='your_password',
    database='your_database',
    autocommit=False #자동 커밋 비활성화
)

#트랜젝션을 컨트롤해서 커밋 타이밍을 정할 수 있다.
#A가 B에게 송금을 할 때 출금 로직 이전에 트랜젝션을 활성화하고, 입금이 성공하면 커밋과 동시에 트랜젝션을 종료. 혹시 도중에 에러가 나면 롤백.

try:
    cursor = conn.cursor()
    #트랜젝션 시작
    conn.begin()

    #테이블 생성
    create_table_query = '''

    '''

    cursor.execute(create_table_query)

    #데이터 생성
    insert_query = '''

    '''

    cursor.execute(insert_query)

    #데이터 수정
    update_query = '''

    '''
    cursor.execute(update_query)

    conn.commit()
    print('트랜젝션 완료')
except Exception:
    conn.rollback()
    print('트랜젝션 롤백')
finally:
    cursor.close()
    conn.close()

 

 


 

 

파이썬 Rest API server

flask, django는 예전부터 쓰던 것, 요즘은 FastAPI를 쓴다.

 

https://fastapi.tiangolo.com/ko/

 

FastAPI

FastAPI framework, high performance, easy to learn, fast to code, ready for production

fastapi.tiangolo.com

 

FastAPI 프레임 워크의 장점

 

 

홈페이지의 튜토리얼을 조금 따라가보면,

pip install fastapi 설치

pip install "uvicorn[standard]" 설치

from typing import Union

from fastapi import FastAPI

app = FastAPI()


@app.get("/")
def read_root():
    return {"Hello": "World"}


@app.get("/items/{item_id}")
def read_item(item_id: int, q: Union[str, None] = None):
    return {"item_id": item_id, "q": q}

main.py 파일을 만들고 위의 코드 형식을 사용할 수 있다.

 

터미널에 uvicorn main:app --reload 입력,

 

http://127.0.0.1:8000/items/5?q=somequery 링크를 들어가면 데이터가 생겼다.

 

 

심화 예제를 실행하면,

from typing import Union

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    price: float
    is_offer: Union[bool, None] = None


@app.get("/")
def read_root():
    return {"Hello": "World"}


@app.get("/items/{item_id}")
def read_item(item_id: int, q: Union[str, None] = None):
    return {"item_id": item_id, "q": q}


@app.put("/items/{item_id}")
def update_item(item_id: int, item: Item):
    return {"item_name": item.name, "item_id": item_id}

 

http://127.0.0.1:8000/docs 링크에서 정리된 것을 볼 수 있다.

 

 


 

 

SQL 데이터베이스에 대한 튜토리얼

https://fastapi.tiangolo.com/ko/tutorial/sql-databases/

 

SQL (Relational) Databases - FastAPI

FastAPI framework, high performance, easy to learn, fast to code, ready for production

fastapi.tiangolo.com

 

 

Docker

https://fastapi.tiangolo.com/ko/deployment/docker/

 

컨테이너의 FastAPI - 도커 - FastAPI

FastAPI framework, high performance, easy to learn, fast to code, ready for production

fastapi.tiangolo.com

728x90
반응형