import unittest
from datetime import datetime
from io import BytesIO
from unittest.mock import patch

from openpyxl import Workbook, load_workbook

from xunfang.service import jiemiansanlei_service as service


class TestJiemiansanleiService(unittest.TestCase):
    def test_query_classified_filters_street_rows_before_paging(self) -> None:
        rows = [
            {
                "leixing": "盗窃",
                "source": "原始",
                "bureau": "云城分局",
                "station_no": "001",
                "station_name": "A所",
                "call_time": "2026-03-01 10:00:00",
                "address": "地址1",
                "lng": "113.1",
                "lat": "22.1",
                "case_type_name": "盗窃",
                "pred_label": "街面与公共区域",
                "pred_prob": 0.9,
            },
            {
                "leixing": "盗窃",
                "source": "原始",
                "bureau": "云城分局",
                "station_no": "002",
                "station_name": "B所",
                "call_time": "2026-03-01 11:00:00",
                "address": "地址2",
                "lng": "113.2",
                "lat": "22.2",
                "case_type_name": "盗窃",
                "pred_label": "住宅小区",
                "pred_prob": 0.8,
            },
            {
                "leixing": "盗窃",
                "source": "原始",
                "bureau": "云城分局",
                "station_no": "003",
                "station_name": "C所",
                "call_time": "2026-03-01 12:00:00",
                "address": "地址3",
                "lng": "113.3",
                "lat": "22.3",
                "case_type_name": "盗窃",
                "pred_label": "街面与公共区域",
                "pred_prob": 0.7,
            },
        ]

        with patch.object(service, "_fetch_rows_for_filters", return_value=rows), patch.object(
            service, "_append_predictions", side_effect=lambda _rows: None
        ):
            result = service.query_classified(
                start_time="2026-03-01 00:00:00",
                end_time="2026-03-02 00:00:00",
                leixing_list=["盗窃"],
                source_list=["原始"],
                page=2,
                page_size=1,
                street_only=True,
                street_filter_mode="model",
                minor_only=False,
            )

        self.assertEqual(result["total"], 2)
        self.assertEqual(result["page"], 2)
        self.assertEqual(len(result["rows"]), 1)
        self.assertEqual(result["rows"][0]["分类结果"], "街面与公共区域")

    def test_query_classified_filters_by_case_contents_keywords(self) -> None:
        rows = [
            {
                "leixing": "盗窃",
                "source": "原始",
                "bureau": "云城分局",
                "station_no": "001",
                "station_name": "A所",
                "call_time": "2026-03-01 10:00:00",
                "address": "地址1",
                "case_contents": "群众报警称路口有人争吵",
                "replies": "",
                "case_type_name": "盗窃",
            },
            {
                "leixing": "盗窃",
                "source": "原始",
                "bureau": "云城分局",
                "station_no": "002",
                "station_name": "B所",
                "call_time": "2026-03-01 11:00:00",
                "address": "地址2",
                "case_contents": "群众报警称家中发生纠纷",
                "replies": "到场处置",
                "case_type_name": "盗窃",
            },
        ]

        with patch.object(service, "_fetch_rows_for_filters", return_value=rows), patch.object(
            service, "_append_predictions", side_effect=lambda _rows: None
        ):
            result = service.query_classified(
                start_time="2026-03-01 00:00:00",
                end_time="2026-03-02 00:00:00",
                leixing_list=["盗窃"],
                source_list=["原始"],
                page=1,
                page_size=None,
                street_only=True,
                street_filter_mode="content_road",
                minor_only=False,
            )

        self.assertEqual(result["total"], 1)
        self.assertEqual(result["rows"][0]["警情地址"], "地址1")
        self.assertEqual(result["street_filter"]["label"], "街面(报警内容-路面)")
        self.assertEqual(result["street_filter"]["fields"], ["报警内容"])
        self.assertIn("路口", result["street_filter"]["keywords"])

    def test_query_classified_uses_recommended_include_and_exclude_keywords(self) -> None:
        rows = [
            {
                "leixing": "盗窃",
                "source": "原始",
                "bureau": "云城分局",
                "station_no": "001",
                "station_name": "A所",
                "call_time": "2026-03-01 10:00:00",
                "address": "人民路广场门口",
                "case_contents": "群众报警称有人争吵",
                "replies": "",
                "case_type_name": "盗窃",
            },
            {
                "leixing": "盗窃",
                "source": "原始",
                "bureau": "云城分局",
                "station_no": "002",
                "station_name": "B所",
                "call_time": "2026-03-01 11:00:00",
                "address": "人民路小区门口",
                "case_contents": "群众报警称有纠纷",
                "replies": "",
                "case_type_name": "盗窃",
            },
            {
                "leixing": "盗窃",
                "source": "原始",
                "bureau": "云城分局",
                "station_no": "003",
                "station_name": "C所",
                "call_time": "2026-03-01 12:00:00",
                "address": "办公楼内",
                "case_contents": "群众报警称有纠纷",
                "replies": "",
                "case_type_name": "盗窃",
            },
        ]

        with patch.object(service, "_fetch_rows_for_filters", return_value=rows), patch.object(
            service, "_append_predictions", side_effect=lambda _rows: None
        ):
            result = service.query_classified(
                start_time="2026-03-01 00:00:00",
                end_time="2026-03-02 00:00:00",
                leixing_list=["盗窃"],
                source_list=["原始"],
                page=1,
                page_size=None,
                street_only=True,
                street_filter_mode="recommended",
                minor_only=False,
            )

        self.assertEqual(result["total"], 1)
        self.assertEqual(result["rows"][0]["警情地址"], "人民路广场门口")
        self.assertEqual(result["street_filter"]["label"], "街面")
        self.assertEqual(result["street_filter"]["fields"], ["警情地址", "报警内容", "处警情况(清洗后)"])
        self.assertIn("路", result["street_filter"]["keywords"])
        self.assertIn("小区", result["street_filter"]["exclude_keywords"])
        self.assertIn("工地", result["street_filter"]["exclude_keywords"])

    def test_clean_replies_text_keeps_feedback_and_drops_dispatch_logs(self) -> None:
        raw_replies = (
            "暂无[2026-04-23 18:42:24 云浮市局指挥中心 张三]\n"
            "派警至管辖单位：测试派出所\n\n"
            "[2026-04-23 18:42:25 测试派出所 李四]\n"
            "警情送达李四警务通\n\n"
            "[2026-04-23 18:57:52 测试派出所 李四]\n"
            "【过程反馈】：民警到达现场了解情况。\n\n"
            "[2026-04-23 20:30:09 测试派出所 李四]\n"
            "【结警反馈】：警情部位：城区道路，警情处理结果说明：已处理完毕。\n"
        )

        cleaned = service.clean_replies_text(raw_replies)

        self.assertIn("过程反馈：民警到达现场了解情况。", cleaned)
        self.assertIn("结警反馈：警情部位：城区道路", cleaned)
        self.assertNotIn("派警至管辖单位", cleaned)
        self.assertNotIn("警情送达", cleaned)

    def test_query_classified_none_mode_does_not_filter_street_rows(self) -> None:
        rows = [
            {
                "leixing": "盗窃",
                "source": "原始",
                "bureau": "云城分局",
                "station_no": "001",
                "station_name": "A所",
                "call_time": "2026-03-01 10:00:00",
                "address": "人民路广场门口",
                "case_contents": "",
                "replies": "",
                "case_type_name": "盗窃",
            },
            {
                "leixing": "盗窃",
                "source": "原始",
                "bureau": "云城分局",
                "station_no": "002",
                "station_name": "B所",
                "call_time": "2026-03-01 11:00:00",
                "address": "办公楼内",
                "case_contents": "",
                "replies": "",
                "case_type_name": "盗窃",
            },
        ]

        with patch.object(service, "_fetch_rows_for_filters", return_value=rows), patch.object(
            service, "_append_predictions", side_effect=lambda _rows: None
        ):
            result = service.query_classified(
                start_time="2026-03-01 00:00:00",
                end_time="2026-03-02 00:00:00",
                leixing_list=["盗窃"],
                source_list=["原始"],
                page=1,
                page_size=None,
                street_only=False,
                street_filter_mode="none",
                minor_only=False,
            )

        self.assertEqual(result["total"], 2)
        self.assertEqual(result["street_filter"]["label"], "不限街面")

    def test_get_street_filter_description_for_model_mode(self) -> None:
        info = service.get_street_filter_description("model")

        self.assertEqual(info["label"], "街面(模型)")
        self.assertEqual(info["fields"], ["警情地址"])
        self.assertEqual(info["keywords"], [service.STREET_LABEL])

    def test_build_case_payload_uses_minor_case_mark_and_original_codes(self) -> None:
        payload = service._build_case_payload(  # noqa: SLF001
            start_time="2026-03-01 00:00:00",
            end_time="2026-03-02 00:00:00",
            source="原始",
            code_csv="0101,0102",
            minor_only=True,
            page_num=1,
            page_size=200,
        )

        self.assertEqual(payload["newOriCharaSubclassNo"], "0101,0102")
        self.assertEqual(payload["newCharaSubclassNo"], "")
        self.assertEqual(payload["caseMarkNo"], service.MINOR_CASE_MARK_NO)

    def test_fetch_source_rows_uses_confirmed_codes_and_matches_leixing(self) -> None:
        with patch.object(
            service.api_client,
            "get_case_list",
            return_value={
                "code": 0,
                "total": 1,
                "rows": [
                    {
                        "caseNo": "A001",
                        "newCharaSubclassNo": "0201,0202",
                        "cmdName": "云安分局",
                        "dutyDeptNo": "1001",
                        "dutyDeptName": "巡逻队",
                        "callTime": "2026-03-01 12:00:00",
                        "occurAddress": "测试地址",
                    }
                ],
            },
        ) as mock_case_list:
            rows = service._fetch_source_rows(  # noqa: SLF001
                start_time="2026-03-01 00:00:00",
                end_time="2026-03-02 00:00:00",
                source="确认",
                leixing="侵财类",
                code_list=["0201", "0301"],
                minor_only=True,
            )

        payload = mock_case_list.call_args.args[0]
        self.assertEqual(payload["newCharaSubclassNo"], "0201,0301")
        self.assertEqual(payload["newOriCharaSubclassNo"], "")
        self.assertEqual(payload["caseMarkNo"], service.MINOR_CASE_MARK_NO)
        self.assertEqual(len(rows), 1)
        self.assertEqual(rows[0]["leixing"], "侵财类")
        self.assertEqual(rows[0]["bureau"], "云安分局")

    def test_export_report_only_uses_time_filters_for_db_query(self) -> None:
        workbook = Workbook()
        default_sheet = workbook.active
        workbook.remove(default_sheet)
        for name in list(service.REPORT_LEIXING_LIST) + ["三类合计"]:
            sheet = workbook.create_sheet(title=name)
            sheet["A6"] = ""

        db_row = {
            "caseno": "A001",
            "leixing": "人身伤害类",
            "yuanshiqueren": "原始",
            "分局": "云城分局",
            "派出所编号": "001",
            "派出所名称": "测试所",
            "报警时间": "2026-03-01 10:00:00",
            "警情地址": "测试路口",
            "报警内容": "测试内容",
            "处警情况": "已处置",
            "经度": "113.1",
            "纬度": "22.1",
            "jq_type": "测试类型",
        }

        def fake_append_predictions(rows):
            for row in rows:
                row["pred_label"] = service.STREET_LABEL
                row["pred_prob"] = 0.99

        with patch.object(service.os.path, "exists", return_value=True), patch(
            "openpyxl.load_workbook",
            return_value=workbook,
        ), patch.object(
            service,
            "fetch_db_jingqings",
            return_value=[db_row],
        ) as mock_fetch, patch.object(service, "_append_predictions", side_effect=fake_append_predictions):
            file_bytes, _mimetype, _filename = service.export_report(
                start_time="2026-03-01 00:00:00",
                end_time="2026-03-02 00:00:00",
                hb_start_time="2026-02-22 00:00:00",
                hb_end_time="2026-02-23 00:00:00",
            )

        self.assertEqual(mock_fetch.call_count, 2)
        for call in mock_fetch.call_args_list:
            query = call.args[0]
            self.assertFalse(query.minor_only)
            self.assertEqual(list(query.leixing_list), list(service.REPORT_LEIXING_LIST))
            self.assertEqual(list(query.source_list), ["原始", "确认"])

        exported = load_workbook(BytesIO(file_bytes))
        self.assertEqual(exported["人身伤害类"]["C6"].value, 1)
        self.assertEqual(exported["三类合计"]["C6"].value, 1)

    def test_build_report_counts_uses_recommended_filter_mode(self) -> None:
        rows = [
            {
                "case_no": "A001",
                "leixing": "人身伤害类",
                "source": "原始",
                "bureau": "云城分局",
                "call_time": "2026-03-01 10:00:00",
                "address": "",
                "case_contents": "",
                "replies": "【结警反馈】：警情部位：广场，警情处理结果说明：民警在广场附近处置完毕",
            },
            {
                "case_no": "A002",
                "leixing": "人身伤害类",
                "source": "原始",
                "bureau": "云城分局",
                "call_time": "2026-03-01 11:00:00",
                "address": "",
                "case_contents": "",
                "replies": "【过程反馈】：民警在路边村口处置",
            },
        ]

        counts = service._build_report_counts(  # noqa: SLF001
            rows_year=rows,
            rows_last_year=[],
            street_filter_mode="recommended",
            segments_year=[
                (
                    "current",
                    datetime(2026, 3, 1, 0, 0, 0),
                    datetime(2026, 3, 2, 0, 0, 0),
                    "C",
                    "D",
                )
            ],
            segments_last_year=[],
        )

        self.assertEqual(counts[("人身伤害类", "云城分局", "C")], 1)
        self.assertEqual(counts[("三类合计", "ALL", "C")], 1)


if __name__ == "__main__":
    unittest.main()
