티스토리 뷰
태블로에서 RFM 대시보드를 만들고 이를 활용해서 CRM 캠페인 액션들을 수행한뒤 유져벌 RFM 세그먼트 이동을 관찰해야 한다.
하지만 이를 정기적으로 다운로드 받아서 구글 시트에 저장해야하는 번거로움이 있다.
반복적인 업무기 때문에 자동화시킬 필요가 있다고 생각하여 구현해본 케이스를 기록도 해볼겸 공유해볼까 한다.

구글링을 해보니 유저 아이디별 라벨링된 세그먼트명을 태블로 RestAPI 를 활용해서 csv 파일로 다운 받을 수 있다.
아주 다행스럽게 파이썬 라이브러리가 있었다.
https://github.com/divinorum-webb/tableau-api-lib
GitHub - divinorum-webb/tableau-api-lib: An API library that allows developers to call on the methods listed in Tableau's REST A
An API library that allows developers to call on the methods listed in Tableau's REST API documentation. - divinorum-webb/tableau-api-lib
github.com
그러면 매번 다운로드해서 관리하지 않아도 정기적으로 RFM 세그먼트의 히스토리 데이터를 적재할 수 있게되고 더불어 이 데이터를 API로 만들어서 Braze에서 쓸 수 있게되면 금상첨화!
구현해보는 걸 나눠서 기록해보려 한다.
다음 글에서는 GCP에서 빅쿼리를 쿼리하는 Cloud Function 로 API를 구현하는 방법을 포스팅 해볼 예정이다.
GCP를 활용했고 구현을 위한 프로세스는 아래와 같다.

일단 Cloud Functions을 2개를 만든다.
하나는 Tableau Rest API 로 데이터를 소싱하는 함수하나 다른 하나는 Cloud Storage에 적재된 CSV 파일을 빅쿼리에 적재하는 함수다.
1. 클라우드 스케줄러로 데이터를 소싱할 시간을 지정해서 Pub/Sub 메시지를 예약.
2. Pub/Sub 메시지로 Cloud Functions을 호출.
3. 함수가 호출되면 데이터를 CSV 로 추출해서 GCS(Cloud Storage)에 적재한다. (그냥 빅쿼리에 바로 적재해도 되지만 혹시 모르니 파일로 저장해둔다.)
4. csv 파일이 GCS에 저장되면 빅쿼리에 데이터를 적재하는 함수를 트리거 시킨다.
5. 빅쿼리에 적재한다.
이렇게 되면 지정한 시간대로 유저별 세그먼트 라벨이 빅쿼리에 적재되게 된다. 히스토리 데이터로 저장되니 이걸 시각화하면 세그먼트의 변화량을 관찰할 수 있게 된다
우선 GCS에 태블로의 view를 csv 파일로 저장하는 코드는 아래와 같다
Cloud Storage 에 Bucket 은 미리 만들어 두고 함수를 만드시길!
import os
from datetime import datetime
from io import BytesIO
import pandas as pd
from google.cloud import storage
from tableau_api_lib import TableauServerConnection
from config import settings
bucket_name = 'bucket_rfm_history'
blob_name = f"rfm_stat_{datetime.now().strftime('%Y-%m-%d')}.csv"
def save_csv_to_gcs(csv_data):
"""
Cloud Storage에 CSV 데이터를 저장합니다.
기존 blob이 존재하더라도 새 데이터로 덮어씁니다.
"""
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(blob_name)
print(f"Saving latest data to '{bucket_name}/{blob_name}'...")
blob.upload_from_string(csv_data, content_type='text/csv')
print(f"Data saved successfully to '{bucket_name}/{blob_name}'.")
def format_date_columns(df):
"""
날짜 컬럼의 형식을 'YYYY-MM-DD'로 변경
"""
try:
# 날짜 컬럼 이름 확인
if 'Day of #1-1. Recency' in df.columns:
# 공백 제거 및 한글 날짜를 ISO 형식(YYYY-MM-DD)으로 변환
df['Day of #1-1. Recency'] = (
df['Day of #1-1. Recency']
.astype(str) # 데이터가 문자열 형식임을 보장
.str.strip() # 공백 제거
)
df['Day of #1-1. Recency'] = pd.to_datetime(
df['Day of #1-1. Recency'],
format='%Y년 %m월 %d일')
).dt.strftime('%Y-%m-%d')
print(f"Formatted 'Day of #1-1. Recency' column to 'YYYY-MM-DD'.")
except Exception as e:
print(f"Error formatting date columns: {e}")
return df
def fetch_tableau_data():
"""
Tableau에서 데이터를 가져와 DataFrame으로 처리하고 CSV 데이터를 반환합니다.
"""
# Tableau config
config = {
'tableau_prod': {
'server': settings['TABLEAU_URL'],
'api_version': '3.24',
'personal_access_token_name': settings['TABLEAU_PERSONAL_ACCESS_TOKEN_NAME'],
'personal_access_token_secret': settings['TABLEAU_PERSONAL_ACCESS_TOKEN_SECRET'],
'site_name': settings['TABLEAU_SITE_ID'],
'site_url': settings['TABLEAU_SITE_ID'],
}
}
# Tableau 서버 연결
conn = TableauServerConnection(config_json=config, env='tableau_prod')
conn.sign_in()
print("Connected to Tableau Server!")
# 데이터 가져오기
view_id = "d0c284ea-ce86-4b9d-8864-b0bf0105118f"
response = conn.query_view_data(view_id=view_id)
# CSV 데이터를 DataFrame으로 변환
csv_data = BytesIO(response.content)
df = pd.read_csv(csv_data)
# created_at 컬럼 추가
today_date = datetime.now().strftime('%Y-%m-%d')
df['created_at'] = today_date
# 날짜 컬럼 형식 변경
df = format_date_columns(df)
# DataFrame을 CSV로 변환
csv_with_created_at = df.to_csv(index=False)
# 데이터 확인 로그
print(f"DataFrame shape: {df.shape}")
print(f"DataFrame preview: \n{df.head()}")
conn.sign_out()
return csv_with_created_at
def main(request, context):
"""
Cloud Function의 엔트리포인트 함수.
"""
try:
# Tableau 데이터 가져오기
csv_data = fetch_tableau_data()
# GCS에 CSV 저장
save_csv_to_gcs(csv_data)
return f"Data fetched and saved to {bucket_name}/{blob_name}.", 200
except Exception as e:
print(f"Error: {str(e)}")
return f"Error: {str(e)}", 500
이제 csv가 자동으로 저장되었으니 빅쿼리로 퍼다 날라야 되니 그 추가로 함수를 만들어 준다

이 때 이 함수는 위에서 언급한대로선택한 버킷에서 파일 완료/생성 시 되어야 된다
import os
import logging
from datetime import datetime
import pandas as pd
from io import StringIO
from google.cloud import storage
from google.cloud import bigquery
from google.cloud.exceptions import NotFound
# 로깅 설정
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
bucket_name = "bucket_rfm_history"
dataset_id = "bucket_data"
table_id = "rfm_history"
# 컬럼 매핑 딕셔너리
COLUMN_MAPPING = {
"RFM score": "rfm_score",
"#1-2. Frequency": "frequency",
"#1-3. Monetary": "monetary",
"#4. RFM Seg": "rfm_seg_nm",
"Cust Id": "cust_id",
"Day of #1-1. Recency": "recency",
"created_at": "created_at",
}
# 컬럼 타입 지정
COLUMN_TYPES = {
"rfm_score": "int64",
"frequency": "int64",
"monetary": "int64",
"recency": "datetime64[ns]",
"created_at": "datetime64[ns]",
}
def sanitize_and_map_columns(df):
"""
컬럼 이름을 요청된 이름으로 매핑하고, 소문자로 변환합니다.
"""
new_columns = []
for col in df.columns:
if col in COLUMN_MAPPING:
# COLUMN_MAPPING에 정의된 컬럼 이름 우선 사용
new_columns.append(COLUMN_MAPPING[col])
else:
# 허용되지 않는 문자 제거 및 알파벳, 숫자, 밑줄(_)로만 구성
sanitized_col = ''.join(char if char.isalnum() or char == '_' else '_' for char in col)
sanitized_col = sanitized_col.lower().strip('_') # 소문자 변환 및 앞뒤 밑줄 제거
# 첫 글자가 '_'로 시작하지 않도록 수정
if sanitized_col.startswith('_'):
sanitized_col = sanitized_col[1:]
new_columns.append(sanitized_col)
df.columns = new_columns
return df
def convert_column_types(df):
"""
컬럼 타입을 지정된 형식으로 변환합니다.
"""
for col, dtype in COLUMN_TYPES.items():
if col in df.columns:
try:
if "datetime" in dtype:
df[col] = pd.to_datetime(df[col], errors="coerce")
else:
df[col] = df[col].astype(dtype)
logger.info(f"Converted column '{col}' to type '{dtype}'.")
except Exception as e:
logger.error(f"Error converting column '{col}' to type '{dtype}': {e}")
return df
def get_bigquery_table_schema(bigquery_client, dataset_id, table_id):
"""
기존 BigQuery 테이블의 스키마를 가져옵니다.
"""
try:
table_ref = bigquery_client.dataset(dataset_id).table(table_id)
table = bigquery_client.get_table(table_ref)
logger.info(f"Existing table schema: {table.schema}")
return table.schema
except NotFound:
logger.info(f"Table {table_id} not found. A new table will be created.")
return None
def load_csv_to_bigquery(bucket_name, file_name, dataset_id, table_id):
"""
Cloud Storage에 업로드된 CSV 파일을 BigQuery 테이블에 로드하는 함수.
"""
if not file_name.endswith('.csv'):
logger.info(f"Skipping non-CSV file: {file_name}")
return f"File {file_name} is not a CSV. Skipped."
logger.info(f"Processing file: {file_name} from bucket: {bucket_name}")
# GCS에서 파일 다운로드
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(file_name)
csv_data = blob.download_as_string().decode('utf-8')
# CSV 데이터를 DataFrame으로 로드
df = pd.read_csv(StringIO(csv_data))
# 컬럼 이름 정리 및 매핑
df = sanitize_and_map_columns(df)
# 컬럼 타입 변환
df = convert_column_types(df)
logger.info(f"Sanitized and mapped columns: {df.columns.tolist()}")
logger.info(f"Loaded DataFrame with shape: {df.shape}")
logger.info(f"DataFrame preview: \n{df.head()}")
# BigQuery 클라이언트 초기화
bigquery_client = bigquery.Client()
# 테이블 스키마 확인 또는 생성
existing_schema = get_bigquery_table_schema(bigquery_client, dataset_id, table_id)
if not existing_schema:
logger.info(f"Creating new table {table_id}.")
schema = [
bigquery.SchemaField(
name,
bigquery.enums.SqlTypeNames.INTEGER if dtype == "int64"
else bigquery.enums.SqlTypeNames.DATE if "datetime" in dtype
else bigquery.enums.SqlTypeNames.STRING) # 기본값: 문자열
for name, dtype in COLUMN_TYPES.items()
]
# DataFrame 컬럼 중 `COLUMN_TYPES`에 없는 컬럼도 문자열로 추가
for col in df.columns:
if col not in COLUMN_TYPES:
schema.append(bigquery.SchemaField(col, bigquery.enums.SqlTypeNames.STRING))
table_ref = bigquery_client.dataset(dataset_id).table(table_id)
table = bigquery.Table(table_ref, schema=schema)
bigquery_client.create_table(table)
logger.info(f"Table {table_id} created.")
else:
# 기존 스키마와 맞춰 DataFrame 조정
schema_field_names = [field.name for field in existing_schema]
df = df[[col for col in df.columns if col in schema_field_names]]
logger.info(f"Adjusted DataFrame to match existing schema: {df.columns.tolist()}")
# BigQuery로 데이터 삽입
table_ref = bigquery_client.dataset(dataset_id).table(table_id)
job = bigquery_client.load_table_from_dataframe(df, table_ref)
job.result()
logger.info(f"Loaded {df.shape[0]} rows into {dataset_id}:{table_id}.")
return f"Loaded {df.shape[0]} rows into {dataset_id}:{table_id}."
def get_latest_file_from_bucket(bucket_name):
"""
Cloud Storage에서 가장 최근 파일 이름 반환.
"""
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blobs = list(bucket.list_blobs())
if not blobs:
raise ValueError("No files found in bucket.")
blobs.sort(key=lambda blob: blob.time_created, reverse=True)
return blobs[0].name
def main(event, context):
"""
Cloud Function 엔트리포인트.
"""
try:
file_name = get_latest_file_from_bucket(bucket_name)
result = load_csv_to_bigquery(bucket_name, file_name, dataset_id, table_id)
logger.info(result)
return result
except Exception as e:
logger.error(f"Error: {str(e)}")
return f"Error: {str(e)}"
이 함수가 동작하면 이제 빅쿼리에 아래와 같이 적재가 된다.

이제 여기서 이걸 브레이즈(Braze) 에 API 를 쓸 수 있게 되면 더 편리해질 것 같다.
다음 편에서는 GCP 에서 Cloud Functions 를 활용해서 빅쿼리를 쿼리하는 API 를 만드는 방법을 소개해볼 예정이다.
https://help.tableau.com/current/api/rest_api/en-us/REST/rest_api.htm
Tableau Server REST API
With the Tableau Server REST API you can manage and change Tableau Server, Tableau Cloud site, and Prep Conductor resources programmatically, using HTTP
help.tableau.com
'IT 관련' 카테고리의 다른 글
브레이즈 커넥티드 컨텐츠(Connected Content)로 활용할 API 만들기 (feat. Google Cloud Platform) (0) | 2025.01.23 |
---|---|
GoogleAnalytics4 세그먼트 제대로 활용하기 (0) | 2024.08.19 |
B2B SaaS MRR 계산하기 - 구글 스프레드시트에 저장하고 데이터 스튜디어로 시각화하기(python) (0) | 2023.06.10 |
Mixpanel 믹스패널 VS Google Analytics(GA4) 차이점 정리 (1) | 2022.10.05 |