Post

Competition01_OTTO-Multi-Objective Recommender System

Competition01_OTTO-Multi-Objective Recommender System

竞赛地址:https://www.kaggle.com/competitions/otto-recommender-system/data?select=train.jsonl

作为第一个推荐系统方面的kaggle竞赛,这里将从零开始,详细记录每一步的进行。

竞赛介绍

OTTO是德国最大的在线商店,任务为:根据用户的一次访问(session)中行为序列(点击、加购、购买),预测用户未来可能对哪些商品感兴趣(最多20个)。对每个session预测三类行为:

  • clicks: 可能会点击的商品;
  • carts: 可能会加购的商品;
  • orders: 可能会下单的商品。

数据集

训练数据:

train.jsonl - 包含完整的会话数据

  • session - 唯一的会话ID
  • events - 会话中事件的时间顺序
    • aid - 商品ID
    • ts - 事件的时间戳,毫秒级
    • type - 事件类型,clicks/carts/orders
1
2
3
4
5
6
7
# 查看前三行数据
with open("dataSet\\train.jsonl","r",encoding="utf-8") as f:
    for i, line in enumerate(f):
        data = json.loads(line)
        print(data)
        if i >= 2:
            break

alt text

测试数据:

test.jsonl - 包含待预测的测试会话数据,不包含最终行为

需要继续预测用户买了什么,继续浏览了什么

  • type - 通常不包含orders, 需要预测

    其余同train

alt text

得分:

根据Recall@20评估预测结果,即命中数/实际目标数

最多可以预测20个,在以召回率为评估的情况下,提交每行预测刚好20个是必要且推荐的做法。

对三个召回值进行加权平均:

score = 0.10 · Rclicks + 0.30 · Rcarts + 0.60 · Rorders


baseline

01_baseline.py

纯计数,统计出train.jsonl的event中出现次数最多的20个商品,作为预测时每个session的商品,即给每个用户不加个性化的推荐热门商品。轮流将事件置为clicks, carts, orders,构建submission.csv。

返回出现次数最多的前20个商品

1
2
3
4
5
6
7
8
9
10
11
12
13
14
top_clicks = Counter()

# 计算总行数
with open('dataSet\\train.jsonl', 'r', encoding='utf-8') as f:
    total_lines = sum(1 for _ in f)

with open('dataSet\\train.jsonl', 'r', encoding='utf-8') as f:
    for line in tqdm(f,total=total_lines,desc="统计热门商品"):
        session = json.loads(line)
        for event in session['events']:
            top_clicks[event['aid']] += 1

# 返回出现次数最多的前20个商品,返回的是一个列表(商品ID,出现次数)
top_items = [str(aid) for aid, _ in top_clicks.most_common(20)]

轮流给商品设置事件,生成submission.csv:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
submission_rows = []

# 获取 test 文件总行数
with open('dataSet\\test.jsonl', 'r', encoding='utf-8') as f:
    total_test = sum(1 for _ in f)

with open('dataSet\\test.jsonl', 'r', encoding='utf-8') as f:
    for line in tqdm(f,total = total_test,desc="构建submission.csv"):
        session = json.loads(line)
        sid = session['session']
        for t in ['clicks', 'carts', 'orders']:
            submission_rows.append({
                'session_type': f"{sid}_{t}",
                'labels': ' '.join(top_items)
            })

submission = pd.DataFrame(submission_rows)
submission.to_csv('output\\submission.csv', index=False)

得分:0.00664(private)


获得共现矩阵(仅根据click)

02_single-co-visitation.py

1. 以嵌套字典保存共现商品

1
2
3
4
5
# 嵌套字典: 字典里的值还是一个字典
# 外层字典: co_visitation[a], 商品a
# 内层字典: co_visitation[a][b], 表示商品a和商品b的共现次数
# defaultdict: 自动初始化
co_visitation = defaultdict(lambda: defaultdict(int))

2. 构建商品共现字典,为加快计算,只考虑相邻点击。

事实上,如果两个商品出现在同一个session ,但他们相隔很远,就可能关系不大;并且如果将所有商品两两共现,就会出现很多毫无关联的商品对,导致共现矩阵庞大且稀疏;限定相邻点击,大幅减少计算量。因此,相邻点击可视为一种精简而精准的共现建模方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 构建商品共现字典
with open(train_path, 'r', encoding='utf-8') as f:
    for line in tqdm(f, total=total_lines, desc='构建共现矩阵'):
        session = json.loads(line)
        aids = [event['aid'] for event in session['events'] if event['type'] == 'clicks']
        if len(aids) < 2:  # 点击数太少则跳过
            continue

        # 只考虑相邻点击
        for i in range(len(aids) - 1):
            a, b = aids[i], aids[i + 1]
            if a != b:
                co_visitation[b][a] += 1
                co_visitation[a][b] += 1

3. 每个商品只保留top 50共现商品,并将结果保留

1
2
3
4
5
6
7
8
9
# 每个商品只保留 top 50 共现商品
for a in co_visitation:
    co_visitation[a] = dict(sorted(co_visitation[a].items(), key=lambda x: -x[1])[:50])

# 保存共现矩阵(以二进制写入模式打开文件,适用于pickle序列化)
with open(co_vis_pkl,'wb') as f:
    pickle.dump(dict(co_visitation), f)

print(f'共现矩阵已保存到 {co_vis_pkl}')

4. 预测

  • 针对测试session, 仅考虑最近的5个点击事件,召回相关商品作为候选:捕捉用户的最新意图,限制候选集合大小;

  • 获得累积加分(即共现次数)最多的20个商品,作为最终预测结果。

    这里暂时将三个事件的20个交互商品都预测为相同的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
with open(test_path, 'r', encoding='utf-8') as f:
    for line in tqdm(f, total=total_test, desc='为测试集生成推荐'):
        session = json.loads(line)
        sid = session['session']

        # 获得最近的点击事件
        clicked = [event['aid'] for event in session['events'] if event['type'] == 'clicks']
        recent_clicks = clicked[-5:]

        # 推荐商品池
        candidates = defaultdict(int)
        for aid in recent_clicks:
            related = co_visitation.get(aid, {})
            for b, score in related.items():
                candidates[b] += score  # 累加得分

        top_items = [str(aid) for aid, _ in sorted(candidates.items(), key=lambda x: -x[1])[:20]]

        for t in ['clicks', 'carts', 'orders']:
            submission_rows.append({
                'session_type': f"{sid}_{t}",
                'labels': ' '.join(top_items)
            })

submission = pd.DataFrame(submission_rows)
submission.to_csv('output\\submission.csv', index=False)

5. 得分: 0.18326


获得共现矩阵(三种事件)

03_multi-co-visitation.py

基于02进行修改

1. 建立三个嵌套字典以保存共现商品

1
2
3
co_visitation_clicks = defaultdict(lambda: defaultdict(int))
co_visitation_cart = defaultdict(lambda: defaultdict(int))
co_visitation_order = defaultdict(lambda: defaultdict(int))

2. 进行热门统计,以在候选共现商品不足时进行填充

1
2
# 热门统计
global_popular = Counter()
1
2
3
4
5
6
7
8
9
10
11
12
with open(train_path, 'r', encoding='utf-8') as f:
    for line in tqdm(f, total=total_lines, desc='构建共现矩阵'):
        session = json.loads(line)
        events = session['events']

        # 更新全局热门
        for e in events:
            global_popular[e['aid']] += 1

        aids = [event['aid'] for event in session['events'] if event['type'] == 'clicks']
        if len(aids) < 2:  # 点击数太少则跳过
            continue

3. 只考虑相邻事件,建立:click共现,click->cart,click->order三个共现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
        for i in range(len(aids) - 1):
            a, t1 = events[i]['aid'], events[i]['type']
            b, t2 = events[i + 1]['aid'], events[i + 1]['type']
            if a == b:
                continue

            # clicks共现
            if t1 == 'clicks' and t2 == 'clicks':
                co_visitation_clicks[a][b] += 1
                co_visitation_clicks[b][a] += 1

            # click -> cart
            if t1 == 'clicks' and t2 == 'carts':
                co_visitation_order[a][b] += 1

            # click -> order
            if t1 == 'clicks' and t2 == 'orders':
                co_visitation_order[a][b] += 1

4. 每个字典保留top 50,保留共现矩阵

1
2
3
4
5
6
7
8
9
# 每个商品只保留 top 50 共现商品
def trim_topk(matrix, k=50):
    for a in matrix:
        matrix[a] = dict(sorted(matrix[a].items(), key=lambda x: -x[1])[:k])
    return matrix

co_visitation_clicks = trim_topk(co_visitation_clicks)
co_visitation_cart = trim_topk(co_visitation_cart)
co_visitation_order = trim_topk(co_visitation_order)
1
2
3
4
5
6
with open(co_vis_pkl, 'wb') as f:
    pickle.dump({
        "clicks": dict(co_visitation_clicks),
        "cart": dict(co_visitation_cart),
        "order": dict(co_visitation_order)
    }, f)

5. 进行预测:将三个事件赋予不同权重,若不足20以热门top50填充

1
2
3
4
5
# 热门top50
global_popular_top = [aid for aid, _ in global_popular.most_common(50)]

# 权重
weights = {"clicks": 1.0, "cart": 1.5, "order": 2.0}

不同共现矩阵中的候选商品赋予不同得分:

1
2
3
4
5
6
7
8
        candidates = defaultdict(int)
        for aid in recent_clicks:
            for b, score in co_visitation_clicks.get(aid,{}).items():
                candidates[b] += weights["clicks"] * score
            for b,score in co_visitation_cart.get(aid,{}).items():
                candidates[b] += weights["cart"] * score
            for b,score in co_visitation_order.get(aid,{}).items():
                candidates[b] += weights["order"] * score

热门商品补位:

1
2
3
4
5
6
7
        # 热门商品补位
        if len(top_items) < 20:
            need = 20 - len(top_items)
            for aid in global_popular_top:
                if str(aid) not in top_items:
                    top_items.append(str(aid))
                if len(top

5. 得分:0.19178

6. 修改选取候选物品的得分:最近的20次点击,越近权重越高,越远越低

1
2
3
4
5
6
7
8
        for idx, aid in enumerate(clicked[-20:]):
            weight = (idx + 1) / 20
            for b, score in co_visitation_clicks.get(aid, {}).items():
                candidates[b] += weight * weights["clicks"] * score
            for b, score in co_visitation_cart.get(aid, {}).items():
                candidates[b] += weight * weights["cart"] * score
            for b, score in co_visitation_order.get(aid, {}).items():
                candidates[b] += weight * weights["order"] * score

7. 得分: 0.19413


获得共现矩阵(三种事件 + 时间戳)

03_2_multi-co-visitation-ts.py

1. 按时间戳排序,避免乱序

1
events = sorted(events, key=lambda x: x['ts'])

2. 两个事件时间间隔越大,权重越低;反之则权重越大

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
 # 遍历相邻点击
        for i in range(len(clicks) - 1):
            a, t1, ts1 = clicks[i]['aid'], clicks[i]['type'], clicks[i]['ts']
            b, t2, ts2 = clicks[i + 1]['aid'], clicks[i + 1]['type'], clicks[i + 1]['ts']
            if a == b:
                continue

            # 时间差(秒)
            dt = max(1, ts2 - ts1)
            # 时间权重:越近权重越高(可以调公式)
            w = 1.0 / (1 + (dt / 600))  # 600秒(10分钟)作缩放

            # clicks共现
            co_visitation_clicks[a][b] += w
            co_visitation_clicks[b][a] += w

        # 处理 click -> cart / order(保持原逻辑,但带时间权重)
        for i in range(len(events) - 1):
            a, t1, ts1 = events[i]['aid'], events[i]['type'], events[i]['ts']
            b, t2, ts2 = events[i + 1]['aid'], events[i + 1]['type'], events[i + 1]['ts']
            if a == b:
                continue

            dt = max(1, ts2 - ts1)
            w = 1.0 / (1 + (dt / 600))

            if t1 == 'clicks' and t2 == 'carts':
                co_visitation_cart[a][b] += w
            if t1 == 'clicks' and t2 == 'orders':
                co_visitation_order[a][b] += w

生成候选商品集

04_train-candidates.py

1. 分批写入候选商品集,避免MemoryError

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
batch_size = 100000  # 每10万条写一次,可根据内存情况调节
candidate_rows = []
part_id = 0
total_batches = (total_lines // batch_size) + 1

with open(test_path, 'r', encoding='utf-8') as f, tqdm(f, total=total_test, desc='生成候选集') as session_pbar, tqdm(
        total=total_batches, desc='写入批次') as batch_pbar:
    for i, line in enumerate(session_pbar):
        session = json.loads(line)
        sid = session['session']

        clicked = [event['aid'] for event in session['events'] if event['type'] == 'clicks']

        candidates = defaultdict(int)
        for idx, aid in enumerate(clicked[-20:]):
            weight = (idx + 1) / 20
            for b, score in co_visitation_clicks.get(aid, {}).items():
                candidates[b] += weight * weights["clicks"] * score
            for b, score in co_visitation_cart.get(aid, {}).items():
                candidates[b] += weight * weights["cart"] * score
            for b, score in co_visitation_order.get(aid, {}).items():
                candidates[b] += weight * weights["order"] * score

        # 取top50作为候选
        top_items = sorted(candidates.items(), key=lambda x: -x[1])[:50]

        for aid, score in top_items:
            candidate_rows.append({
                "session": sid,
                "candidate": aid,
                "score": score,
                "source": "co_vis"
            })

        # 热门补位
        for aid in global_popular_top:
            if aid not in candidates:
                candidate_rows.append({
                    "session": sid,
                    "candidate": aid,
                    "score": 0,
                    "source": "popular"
                })

        # 分批写入
        if (i + 1) % batch_size == 0:
            df = pd.DataFrame(candidate_rows)
            df.to_parquet(f"dataset/candidates_part_{i // batch_size}.parquet", index=False)
            candidate_rows = []  # 清空缓存
            part_id += 1
            batch_pbar.update(1)

# 最后一批写入
if candidate_rows:
    df = pd.DataFrame(candidate_rows)
    df.to_parquet(f"dataset/candidates_part_last.parquet", index=False)
    part_id += 1
    batch_pbar.update(1)

print("候选集已分批保存到 dataset/")

2. 候选文件整合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
files = sorted(glob.glob("dataset/candidates_part_*.parquet"))

writer = None

with tqdm(total=len(files), desc="合并候选集文件") as merge_pbar:
    for f in files:
        df = pd.read_parquet(f)
        table = pa.Table.from_pandas(df)
        if writer is None:
            writer = pq.ParquetWriter("dataset/candidates.parquet", table.schema)
        writer.write_table(table)
        merge_pbar.update(1)

if writer:
    writer.close()

print("候选集已成功合并保存到 dataset/candidates.parquet")

同理,可获得test-candidates

04_2_test-candidates.py


提取session标签,保存为结构化parquet文件

get_train_labels.py

alt text


获得训练特征train features

05_get-train-features.py

**各个特征计算方法: **

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
for row in tqdm(candidates_df.itertuples(), total=len(candidates_df), desc="Processing candidates"):
    session = row.session
    cand = row.candidate
    base_score = row.score

    true_items = session_dict.get(session, [])
    if not true_items:
        continue

    # 时间加权特征
    co_scores = []
    co_scores_weighted = []
    session_len = len(true_items)
    max_ts = max([ts for _, ts in true_items]) if session_len > 0 else 0

    for aid, ts in true_items:
        score = co_vis.get(aid, {}).get(cand, 0)
        co_scores.append(score)
        # 时间衰减权重
        alpha = 0.001
        weight = np.exp(-alpha * (max_ts - ts))
        co_scores_weighted.append(score * weight)

    co_max = np.max(co_scores) if co_scores else 0
    co_mean = np.mean(co_scores) if co_scores else 0
    co_sum = np.sum(co_scores) if co_scores else 0

    co_max_weighted = np.max(co_scores_weighted) if co_scores_weighted else 0
    co_mean_weighted = np.mean(co_scores_weighted) if co_scores_weighted else 0
    co_sum_weighted = np.sum(co_scores_weighted) if co_scores_weighted else 0

结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
buffer_feat.append({
    "session": session,
    "candidate": cand,  # 候选商品
    "base_score": base_score,
    "co_max": co_max,  # 所有历史商品与候选商品最大共现分数
    "co_mean": co_mean,  # 平均共现分数
    "co_sum": co_sum,    # 总共现分数
    "co_max_weighted": co_max_weighted,  # 时间衰减函数,计算weight
    "co_mean_weighted": co_mean_weighted,
    "co_sum_weighted": co_sum_weighted,
    "session_len": session_len
})

同理,可获得test features

05_2_get-test-features.py


使用LightGBM快速测试

1. 将labels逐步返回,分批次提供给训练

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def labels_generator(labels_path):
    parquet_file = pq.ParquetFile(labels_path)
    for i in range(parquet_file.num_row_groups):
        batch = parquet_file.read_row_group(i).to_pandas()
        session_dict = {}
        for _, row in batch.iterrows():
            session = row['session']
            session_dict[session] = {
                'clicks': set(row.get('clicks', [])),
                'carts': set(row.get('carts', [])),
                'orders': set(row.get('orders', []))
            }
        yield session_dict  # 逐步返回,一小批一小批的提供给训练过程
        del batch
        gc.collect()

features同理

2. 使用LightGBM进行训练3个二分类模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def train_lgb_incremental(features_path, labels_path, target_cols):
    labels_gen = labels_generator(labels_path)
    models = {target: None for target in target_cols}

    parquet_file = pq.ParquetFile(features_path)
    total_batches = parquet_file.num_row_groups

    for batch_idx, batch_df in enumerate(tqdm(feature_batch_generator(features_path, labels_gen),
                                              total=total_batches,
                                              desc="Processing Batches")):
        feature_cols = [c for c in batch_df.columns if c not in ['session', 'candidate',
                                                                 'clicks_label', 'carts_label', 'orders_label']]
        for target_col in tqdm(target_cols, desc=f"Training Models on Batch {batch_idx + 1}", leave=False):
            X = batch_df[feature_cols]
            y = batch_df[target_col]
            lgb_train = lgb.Dataset(X, label=y)

            params = {
                'objective': 'binary',
                'metric': 'auc',
                'boosting_type': 'gbdt',
                'learning_rate': 0.05,
                'num_leaves': 64,
                'max_depth': -1,
                'min_data_in_leaf': 50,
                'feature_fraction': 0.8,
                'bagging_fraction': 0.8,
                'bagging_freq': 5,
                'verbosity': -1
            }

            if models[target_col] is None:
                models[target_col] = lgb.train(params, lgb_train, num_boost_round=100)
            else:
                models[target_col] = lgb.train(params, lgb_train, num_boost_round=100, init_model=models[target_col])

            del X, y, lgb_train
            gc.collect()

        del batch_df
        gc.collect()

    # 保存模型
    for target_col, model in models.items():
        model.save_model(f"lgbm_{target_col}.txt")

    return models

3. 使用3个模型进行测试

06_2_test.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def predict_submission(features_path, models, topk=20):
    parquet_file = pq.ParquetFile(features_path)
    # print(f"[INFO] Total row groups in parquet: {parquet_file.num_row_groups}")

    results = []

    for rg in tqdm(range(parquet_file.num_row_groups), desc="Predicting"):
        batch = parquet_file.read_row_group(rg).to_pandas()
        # print(f"[DEBUG] Row group {rg}: shape = {batch.shape}")

        batch_results = []  # 每个 batch 的结果先放临时列表

        for target in ["clicks", "carts", "orders"]:
            model = models[target]
            model_feature_names = model.feature_name()

            # 补齐缺失特征
            for f in model_feature_names:
                if f not in batch.columns:
                    batch[f] = 0

            X = batch[model_feature_names]
            batch[f"{target}_pred"] = model.predict(X)

            # 分组取 TopK
            preds = (
                batch.groupby("session")
                .apply(lambda x: x.nlargest(topk, f"{target}_pred")["candidate"].astype(str).tolist())
            )
            preds = preds.reset_index().rename(columns={0: "labels"})
            preds["labels"] = preds["labels"].apply(lambda x: " ".join(x))
            preds["session_type"] = preds["session"].astype(str) + f"_{target}"

            batch_results.append(preds[["session_type", "labels"]])

        # 合并并去重,避免 batch 内部重复
        batch_results = pd.concat(batch_results, ignore_index=True)
        batch_results = batch_results.drop_duplicates(subset=["session_type"], keep="first")

        results.append(batch_results)

    # 拼接所有 batch
    submission = pd.concat(results, ignore_index=True)

    # 再次全局去重,保证唯一
    submission = submission.drop_duplicates(subset=["session_type"], keep="first")

    return submission

4. 得分:0.21721


This post is licensed under CC BY 4.0 by the author.