ブログ

  • AI空模様告知システム

    まあ空を見れば分かる事なので絶対必要という機能では無いのですが、農家の方などは、実際の現地の年間空模様を記録して保管し1年の作物作り計画を立てる際に利用出来ます。
    私は、今山奥に住んでいますが、今まで農業をした経験が全く無いので今のところ必要ありませんが
    暇なので作って見たいと思いました。
    webでググったら「全天力メラを用いた空の状態観測手法の開発」山下、吉村という論文を見つけました、この論文の内容を参考にスクリプトを作りました。
    最初、Raspberry Pi3とPi camera3で作成しました。
    opencvを使って2値化などの方法をいくつか試しましたが、色々な場面、例えば太陽が画角に入
    った時の処理などがどうしても上手く行きませんでした。
    太陽をマスクして太陽以外平均的なSI値に置換処理などを行ってみましたが、私の腕が悪いのか思っ
    たような結果は出ませんでした。
    そこでやっぱり「今時はAIじゃない。」という事になりCNNでの処理に切り替えました。

    (1) 空模様素材の収集
    使用カメラの件
    従来小型のRaspberry Pi4やPi5にAI Picamera、Nvidia ORIN Nanoなどを設置運用していましたが、最終的に検知反応速度の遅さや電源確保の難しさや屋外設置での耐久性などの問題で限界を感じておりました。
    そこで比較的取り扱いし易い市販の安価な監視カメラを何とか利用出来ないか?と考えた訳です。
    しかし、大体の市販の監視カメラは、メーカー独自の通信をしており我々が手を出せないの物が殆です。
    その中で唯一使えるのが、Tapo-C310A等のRTSP通信を開放しているカメラです。
    そこで今回は、このカメラを使用した空模様告知システムを構築します。

    Tapo C310A監視カメラ

    Tapoカメラの設定について
    カメラの説明書は以下のurlサイトにあります。
    https://www.tp-link.com/jp/support/download/tapo-c310/
    マニュアルのところに「日本語版Tapoカメラガイド」というのがありますのでダウンロードして使用します。
    Tapoカメラは監視カメラですが、今回は監視の機能は使いませんので設定はすべて無効にします。
    一番大事な部分は「高度な設定」という項目で「カメラのアカウント」->「アカウント情報」->「ユー
    ザー名」「パスワード」を設定します。
    「ネットワーク設定」->「IPアドレス」「サブネットマスク」「ゲートウエイ」「DNS」の項目を設定し
    ます。
    この情報は、あとでスクリプト内からカメラを呼び出す際に使用します。
    rtsp://ユーザー名:パスワード@IPアドレス:554/stream1  (半角アルファベット文字で指定して下さい)

    素材の収集用スクリプト(Raspberry Piで収集 Rasspberry Pi3でも可能)
    一番大変なのは、素材の収集ですが、これは先に試作失敗したBI -SIのスクリプトを使って大まかに収
    集し、そのあとで目視の手作業で分類しました。
    カメラは最初Pi camera3を使っていましたが、これらの屋外での運用は意外に面倒で灼熱の太陽や大
    降りの雨雪に耐える構造にしないといけません。
    また、Pi camera3はそのままでは基板が露出してぶらぶらしているので固定にも苦労します。
    取り付けたり外したり試行錯誤していたらとうとう壊してしまいました。
    頭に来たのでPi camera3はやめることにしました。
    同時に屋外にRaspberry Piを設置するという考えも問題が多いのでやめました。

    Tapo C310Aは、画角が100度という事でPi camera3の広角タイプ102度より狭いですが、私の場合山の
    中なので水平から40度~45度位は山が映り込みますのでいずれにしてもマスクしないと使えません
    Tapoで十分です、しかし実際の画角はもっと狭いような気もしますが特に気にしません。
    TapoカメラよりPi camera3の方がこのBI-SI方式ではいい結果が出ていました。
    Tapoカメラでは特に夕方太陽が沈みかけた時分の彩度が上がらずSI値が0になり良い天気でも曇天と
    判断してしまいます、またダイナミックレンジが狭いというか、SI値の取る範囲が狭くK曲線の閾値が
    狭いのでBI -SI方式には使えないカメラのようです、値段から考えれば仕方ないかと思います。
    Pi vcamera3は、固定ホワイトバランスが必須設定で使いましたが、夕方になると、ホワイトバランス
    が合わず、実際と違う検出をしていまいたのでいずれにしてもこの価格帯のカメラでは、夕方以降は
    使えないと思った方が無難な感じです。
    かといってこれは遊びの一環なので高いカメラを買う予定はありません、何とかプログラムのアルゴ
    リズムを屈指してある程度使えるシステムにしたい考えです。

    Tapoカメラを使って素材を収集するBI SI方式のスクリプトを以下に示します。
    大まかな原理は、空模様は以下の式に概ね従うという観測標本から出された数式です。
    BI = e^-(K*SI)
    BI:輝度情報 SI:色情報 B-R/B+G+R e:自然対数の底 K:係数
    詳しくは、本論文pdfを探して下さい。

    このスクリプトは、Raspberry Pi で使用します、AI処理では無いのでRaspberry Pi3でも動作します。
    仮想領域「SUN」を先に作っておきます。
    その中に「CAUTURE」フォルダーがありそこにキャプチャー画像は保存されます。
    キャプチャーは、1920×1080ピクセルで行いましたが、保存する前に224×224ピクセルにスクイーズ
    処理を施しています。
    CNN処理が224×224なのでそれに合わせた訳ですが、正確にいくつにしたら良い結果が出るかは検証
    していません。
    検出精度の問題は、そこの部分よりカメラ自体の光が少ない場合の感度不足、コントラスト不足、色
    情報の不足に原因があるように感じているからです。
    つまり、現状よりも精度を上げるには、ズバリもっと感度の良いカメラにしなさい、という事なのだ
    と思います。
    Tapoカメラのままで更に検知精度を上げるという課題は、次のプロジェクトで計画していますので
    今回は初回実験として捉えて下さい。

    スクリプトの説明(Raspberry付属のPython編集ソフト「Thnny」上で実行します。)

    インストールの仕方は、「RaspberryPiのインストール設定方法」を参照ください。

    ● camera_id = “rtsp://tarou:1234567@192.168.1.100:554/stream1? rtsp transport=tcp”
      tarouの部分をTapoに指定したIDを、1234567のところは設定したパスワードに
      192.168.1.100のところは指定したIPアドレスを記入します。
    ● client.connect(“192.168.1.200”, 1883, 60)#記入例
      MQTTサーバーのアドレスとポート番号を記入します。
    ● 指数係数K Valuesは、天気を判断する曲線を切り分ける重要な係数です。
      状況によって異なるため、実際にTapoカメラを接続してテスト運用し、一番適当に切り分けが
      出来る数値に変更して下さい。
      Pi camera3では結構いい結果が出せていたんですが、Tapoとの組み合わせでは、めちゃめちゃ上
      手く行きません。
      このスクリプト自体が実験用の未完成品なのであまり上手く行かないのは当然ですが、実際の天
      気を見て与えたKの値で自分で空を見た空模様と一致する画像が取れるようにして下さい。
      あくまで学習用の素材の入手が目的なのでその点をご了承ください。
    ● output_dir = ‘/home/pi/SUN/CAPTURE’
      はキャプチャー画像を保存するフォルダーの場所を指定しています。
      自動でBS 、CB、 CSの各フォルダーが出来て保その中に224×224ピクセル画像が保存されます。
    ● 30秒おきに静止画を保存していますが、適当に変更して下さい。
      schedule.every(30).seconds.do(task)
    ● 課題の一つは、夕方あるいは夜になるとカメラの性能から天気の判別が難しくなること(実際と
      違う検知がされてしまう)
      一つの解決策としては、現在の時間から判断する方法、この場合、夏時期と冬時期では、暗くな
      る時間が違うし地域によってもかなり違うので決定的な方法とは言えない。
      ベストな方法は、現在の照度を測って必要な照度に不足する場合には、測定をやめるという方法
      が良いと思う、そして夜が明けて明るくなったらまた報告を開始する仕組み。
      MQTTパブリッシャー機能を使って天気模様をNODE-REDサーバー送信すると共に、MQTTサブス
      クライバー機能を使って別に設けた「照度センサー」からの送信情報を受信し、照度が低くなると
      検知システム報告を停止する仕組みも組み込んである。
      照度センサーを設置した場合には、スクリプトの以下をコメントアウトして下さい。
      current_lux = 2000
      システムの稼働時間帯を朝7時から夕方5時に設定してあります、変更したい場合には以下を変
      更して下さい。
      start_time = 7       end_time = 17
       照度計を設置した場合、システムを遮断する照度最低値はデフォルト値は300luxに設定してありま
      す、変更する場合には、LUX_THRESHOLD = 300 を変更して下さい。

    K値による空模様を3分類するスクリプト

    import os
    import sys
    sys.path.insert(1,"/home/pi/SUN/lib/python3.13/site-packages")
    import math
    import cv2
    import numpy as np
    import matplotlib.pyplot as plt
    import random
    import time
    import datetime
    import schedule
    import pytz
    from time import sleep
    import paho.mqtt.client as mqtt
    
    
    delay = 1
    count = 0
    window_name = 'frame'
    epsilon = 1e-10  # ゼロ除算防止
    SI_mean = 0.001
    # ===== 送信制御フラグ =====
    allow_send = True        # MQTT送信許可
    current_lux = None       # 最新照度
    LUX_THRESHOLD = 300      # 閾値
    time_allow = False
    start_time = 7       #朝測定開始時間
    end_time = 17        #夜測定終了時間
    # --- TapoカメラURL ---
    camera_id = "rtsp://tarou:1234567@192.168.1.100:554/stream1?rtsp transport = tcp"
    output_dir = '/home/pi/SUN/CAPTURE/'
    # グラフ保存用ファイル
    GRAPH_PATH = "bi_si_plot.png"
    
    # K値と空の状態
    K_values = {
        'CB': 3.0000,  # 快晴
        'BS': 7.0000,  # 晴天
        'CS': 25.0000  # 曇天
    }
    
    # 記号の対応表
    MARKERS = {
        'CB': 'o',  # ◎
        'BS': 'v',  # △
        'CS': 's'   # ■
    }
    
    plt.ion()  # インタラクティブモードを有効化
        
    def on_message(client, userdata, msg):
        global allow_send, current_lux
    
        topic = msg.topic
        payload = msg.payload.decode()
    
        # --- シャットダウン ---
        if topic == "sun_pat" and payload == "shutdown":
            print("Shutdown command received")
            client.loop_stop()
            os.system("sudo shutdown -h now")
            return
    
        # --- 照度受信 ---
        if topic == "lux":
            try:
                current_lux = float(payload)
                current_lux = 2000            #照度計が設置されている場合にはコメントアウトする。
                if current_lux < LUX_THRESHOLD:
                    allow_send = False
                    print(f"Lux={current_lux} → 送信停止")
                else:
                    allow_send = True
                    print(f"Lux={current_lux} → 照度OK")
            except ValueError:
                print("lux payload parse error:", payload)
                    
    
    
    def on_connect(client, userdata, flags, reason_code, properties):
        print(f"Connected mqtt broker {reason_code}")
        client.subscribe("lux")
        client.subscribe("sun_pat")
    
    client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
    client.on_connect = on_connect
    client.on_message = on_message
    client.connect("192.168.2.242", 1884, 60)
    client.loop_start()
    
    def publish(data):
        if not allow_send:
            print("送信停止中のため publish をスキップ")
            return
        client.publish('sky_pat', payload=data, qos=0, retain=False)
        print(f"send {data} to sky_pat")
    
    def save_image_with_sequence(image, directory, filename_prefix, start_num=1, padding=5):  
        if not os.path.exists(directory):
            os.makedirs(directory)
        num = start_num
        while True:
            # 現在の日時を取得
            now = datetime.datetime.now(pytz.timezone('Asia/Tokyo'))
            filename = str(now.year) + str(now.month) + str(now.day) + str(now.hour) + '-' + f"{str(num).zfill(padding)}.png" 
            filepath = os.path.join(directory, filename)
            if not os.path.exists(filepath):
                cv2.imwrite(filepath, image)
                print(f"保存しました: {filepath}")
                break
            num += 1
    
    # ファイル名のプレフィックス
    filename_prefix = "image"
    # 連番の開始番号
    start_number = 1
    # 連番の桁数
    padding = 5
    
    def create_initial_plot():
        """ 初回にBI-SIグラフを作成し保存 """
        SI_range = np.linspace(-0.1, 0.4, 100)  # SI の範囲
    
        plt.figure(figsize=(5, 5))
        for label, K in K_values.items():
            BI_curve = np.exp(-K * SI_range)  # BI値の計算
            plt.plot(SI_range, BI_curve, label=f"{label} (K={K})")
    
        plt.xlabel("SI (Sky Index)")
        plt.ylabel("BI (Brightness Index)")
        plt.title("BI-SI Relationship")
        plt.ylim(0, 1)  # BI軸を 0-1 に固定
        plt.legend()
        plt.grid(True)
        
        # 画像として保存
        plt.savefig(GRAPH_PATH)
        plt.close()
    
    # 初回のみグラフ作成
    if not os.path.exists(GRAPH_PATH):
        create_initial_plot()
    
    def update_plot(SI_new, BI_new, sky_type):
        """ BI-SIグラフに新しいデータ点をプロットし、グラフを更新 """
        SI_range = np.linspace(-0.1, 0.4, 100)  # SI の範囲を統一
    
        plt.clf()  # 既存のプロットをクリア
        for label, K in K_values.items():
            BI_curve = np.exp(-K * SI_range)  # BI値の計算
            plt.plot(SI_range, BI_curve, label=f"{label} (K={K})")
        dot = 'gray'
        # 新しいデータ点をプロット
        if sky_type == 'CB' or sky_type == 'BS' or sky_type == 'CS':
            dot = 'gray'
        elif SI_new < 0:
            dot = 'red'
        marker = MARKERS.get(sky_type, 'x')  # デフォルト 'x'
        plt.scatter(SI_new, BI_new, color = dot, marker=marker, s=100, label=f"{sky_type} \
    ({SI_new:.3f}, {BI_new:.3f})")
    
        # 軸ラベルとタイトル
        plt.xlabel("SI (Sky Index)")
        plt.ylabel("BI (Brightness Index)")
        plt.title("BI-SI Relationship (Updated)")
        plt.ylim(0, 1)  # BI軸を 0-1 に固定
        plt.legend()
        plt.grid(True)
        
        # グラフを保存 & 更新
        plt.savefig(GRAPH_PATH)
        plt.draw()
        plt.pause(0.1)  # 一瞬待つことで、インタラクティブモードで更新が反映される
    
    #定期実行する関数を準備
    def task():
        global time_allow
        
            # 空模様の分類
        def classify_sky(SI, BI):       
            # BI = e^(-K * SI) の関係から、各K値に対するBIを計算
            differences = {}
            for key, K in K_values.items():
                if SI < 0:
                    SI = -SI
                expected_BI = math.exp(-K * SI)
                differences[key] = abs(BI - expected_BI)
    
            # 最も差が小さいものを選択
            closest_sky = min(differences, key=differences.get)
            return closest_sky
        
        now = datetime.datetime.now(pytz.timezone('Asia/Tokyo'))
        hour = now.hour
    
        if hour < start_time or hour >= end_time:
            print("hour{時夜間のため送信停止}")
            time_allow = False
            return
        else:
            time_allow = True
        
        cap = cv2.VideoCapture(camera_id)
        cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1920) # カメラ画像の横幅設定
        cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 1080) # カメラ画像の縦幅設定
        if not cap.isOpened():
            print("x RTSPストリームに接続出来ません",flush=True)
            sys.exit(1)
        #カメラからの画像取得
        ret, frame = cap.read()
        if not ret:
           print("フレーム取得失敗",flush=True)
           sys.exit(1)
        # BGRチャンネルに分割
        DNB, DNG, DNR = cv2.split(frame)
        # SIの計算
        SI = (DNB.astype(np.float32) - DNR.astype(np.float32)) / (DNB.astype(np.float32) + \
                                                                  DNR.astype(np.float32) + epsilon)
        SI_mean = np.mean(SI)
        # BIの計算
        BI = (DNB.astype(np.float32) + DNG.astype(np.float32) + DNR.astype(np.float32)) / (255 * 3)
        BI_mean = np.mean(BI)
        # SIマップを可視化(0~255に正規化)
        SI_vis = cv2.normalize(SI, None, 0, 255, cv2.NORM_MINMAX).astype(np.uint8)   
        # BIマップを可視化(0~255に正規化)
        BI_vis = cv2.normalize(BI, None, 0, 255, cv2.NORM_MINMAX).astype(np.uint8)  
    
        # 現在の空模様を分類
        current_sky = classify_sky(SI_mean, BI_mean)    
        #K
        calk_K = abs(np.log(BI_mean)/SI_mean)
          
        #mqtt publish
        snd = ''
        tenki = ''
        if current_sky == 'CB':
            tenki = '快晴'
            snd = '1'
        elif current_sky == 'BS':
            tenki = '晴れ'
            snd = '2'
        elif current_sky == 'CS':
            tenki ='曇天' 
            snd = '3'
        if not (allow_send and time_allow):
            print("送信条件未達(照度 or 時刻)")
            return
        
        #結果を送信
        publish(snd)    
        preview = cv2.resize(frame,(224,224))
        #保存先のディレクトリ
        save_image_with_sequence(preview, output_dir + current_sky + '/', "image")
    
        # 表示用プレビュー      
        cv2.imshow("Frame", preview)
        cv2.waitKey(1)
        #cv2.imshow("SI Map", SI_vis) 
        #cv2.imshow("BI Map", BI_vis)
            
        # 結果を表示
        print(f"SIの平均値: {SI_mean:.4f}")
        print(f"BIの平均値: {BI_mean:.4f}")
        print(f"現在のK:{calk_K:.4f}")
        print(f"[{time.strftime('%H:%M:%S')}] 現在の空模様:{tenki}")
    
        # 例: 新しいデータをプロット
        SI_new = SI_mean # 取得したSI値
        BI_new = BI_mean       
        sky_type = current_sky # 推定された空模様
        update_plot(SI_new, BI_new, sky_type)
    
    
    #スケジュール登録   
    schedule.every(30).seconds.do(task)
    #イベント実行
    while True:  
        if cv2.waitKey(delay) & 0xFF == ord('q'):
            break
        schedule.run_pending()
        sleep(1)
    

    (2) キャプチャーした天気画像を整理する。
     BS、CB、CSフォルダーに収納された画像はK値が適切であれば大まかには合っている筈です。
     しかし、誤って収容されたデーターも多いので最終的には自分で目視にて正確に分類します。
     快晴の定義は「雲の量が1割以下(0~1割)です。
     晴れの定義は「雲の量が2~8割」です。
     それ以外は曇りと分類します。
     但し、全天カメラ画像では無いので、上記の仕様を適用しても余り意味は薄い感じもします。
     自分の主観による判断で良いと思います。(厳密な気象観測目的では無いので)
     難しいのは、夕方の晴れと他の天気との分類です。
     Tapoカメラの性能も高くないので、限度はあります、暗くなったらあきらめましょう。
     実際夕方Tapoの収集した空模様は、私が見ても判断できない場合があります。

     キャプチャー画像のファイル名は自動で日付や時間を元に自動生成したものです。
     分類が完了したら、CNN学習で使用できるような名前にリネームします。
     次のスクリプトを使ってリネームを掛けて下さい。
     リネームするとCB-00005.png BS-00251.png CS-00024.pngのような形式に変わります。
     このスクリプトはドッカー内で実行します。


     リネームするスクリプト(ドッカー内のJupyter notebookで実行する。
     ドッカーをまだ作っていない場合には以下を参照してドッカーを先に作っておいて下さい。
     但し、リネームスクリプトは問題ないですが、実際のCNN学習では、グラフィックカードの下限が
     あります、概ねNVIDIA RTX3060 12GB以下の性能では能力不足で動作しませんのでご注意下さい。
             (CUDA対応のJupyter notebookが動くドッカーを作る
     スクリプトの説明
     cls_name = “BS”
     target_folder = “/data/sun/” + cls_name
     最初にRaspberry Piでキャプチャー収集したお天気の画像をフォルダーごとPC側にコピーします。
     USBメモリーなどを使用します。
     例えば、F:data/sunフォルダの中へコピーします。
     ドッカーUbuntu側からは/data/sunでアクセスします。

    素材の名前をリネームするスクリプト

    import os
    import glob
    
    #クラスの名前
    cls_name = "BS"   
    # 現在のディレクトリ内の 'images' フォルダを指します
    target_folder = "/data/sun/" + cls_name 
    
    def rename_images_sequentially(folder_path, start_num=1, num_digits=4):
        """
        指定されたフォルダ内の画像ファイル名を連番に変更します。
    
        :param folder_path: 対象フォルダのパス
        :param start_num: 開始番号 (デフォルト: 1)
        :param num_digits: 連番の桁数 (デフォルト: 4, 例: 0001)
        """
        # 対応する画像ファイルの拡張子リスト
        extensions = ('*.jpg', '*.jpeg', '*.png', '*.gif', '*.bmp', '*.tiff')
        files = []
        for ext in extensions:
            # globを使用して指定拡張子のファイルを取得し、リストに追加
            files.extend(glob.glob(os.path.join(folder_path, ext)))
    
        # ファイル名をソートすることで、ある程度の順序(ファイル名順)を保証
        # os.listdir() の結果は順序が不定な場合があるため、sorted() が重要
        files.sort()
    
        print(f"対象ファイル数: {len(files)} 件")
    
        for index, file_path in enumerate(files, start=start_num):
            # ファイルの拡張子を取得
            _, file_extension = os.path.splitext(file_path)
    
            # 新しいファイル名を生成 (例: 0001.jpg)
            new_file_name = cls_name + '-' + f"{index:0{num_digits}d}{file_extension}"
            new_file_path = os.path.join(folder_path, new_file_name)
    
            # リネーム実行
            try:
                os.rename(file_path, new_file_path)
                print(f"'{os.path.basename(file_path)}' -> '{new_file_name}'\
                に変更しました。")
            except OSError as e:
                print(f"エラー: '{os.path.basename(file_path)}' のリネームに\
                失敗しました: {e}")
    
    if __name__ == "__main__":
        
        # フォルダパスが存在することを確認
        if os.path.isdir(target_folder):
            rename_images_sequentially(target_folder)
        else:
            print(f"指定されたフォルダが見つかりません: {target_folder}")
            print("スクリプト内の 'target_folder' 変数を正しいパスに編集してください。")

    リネームが完了したら、F:data/にlearning_datasetsフォルダーを作ります、そしてその中に更にtrainフ
    ォルダーとtestフォルダーを、更にそれらの中にBS、CB、CSのフォルダーも作ります。
    図で示すと次のようになります。
    先ほどリネームした各素材画像を8対2の割合でtrain配下とtest配下のフォルダーに配分します。
    これで準備は完了です。


    (3) 集めた画像でCNNデイープラーニングする。
    スクリプトの原型は赤石 雅典先生の本「最短コースでわかる PyTorch &深層学習プログラミング」の
    記事を参考にしました。
    この本は、今まで読んだちっとも理解できない本と違い私のような馬鹿でも分かったような気がした一品です、数式から丁寧に説明されているので理解出来ました。
    併せてお勧めなのが、同じ先生が出されている「最短コースでわかる ディープラーニングの数学」
    です、私は両方購入しました。
    webで「赤石-雅典 github」と検索すれば、丁寧なフォローもあり人柄が伺えます。
    こうゆう種の本は、人によって相性があるのは否めませんが、私はどうも訳本が苦手で読んでいても
    内容が頭に入って来ません。
    私が馬鹿という理由はおいといて、原因は訳者ではなく英語圏の人の言い回しにあるような気がします。
    いずれにしても赤石先生の本は個人的に私にとってはAI学習の突破口になりました。
    感謝してもしきれません。(次の本が出ないかなー待ちどうしい)

    スクリプトの説明
    ● data_dir= ‘/data/learning_datasets’
      は学習データーのあるフォルダーの場所をUbuntu側から見えるアドレスに指定しています。
    ● 学習するクラス名を記述しています。
      フォルダの並びはフォルダー名のアルァベット順になっています。
      classes = [‘BS’,’CB’,’CS’]
    ● 後で検知に使用する為のモデルの書き出しを行っています。
      PyTorchモデルとONNXモデルです。
      モデル名は、プロジェクトの名前 + 学習結果の最良な検証精度(Validation accuracy)の値を四捨
      五入した数値に自動的に設定しています。
      このモデルファイルは、WindowsPCでもRaspberryPiでも使えます。
      モデル名:sunでval accが95.34ならsun_95.pth またはsun_95.onnxとなります。
    ● 学習させる際の回数「epoch」は最大500epochになっていますが、実際に実施される回数は、過
      学習を防止する為、自動で抑止して最適なところで停止するよう設計してあります。
      また、停止した後、学習過程でも最もval accが高いepochの物を書き出せるようになっています。
      この機能により、1週間掛けて「200epochやったのに結果は上手く行っていなかった、20epoch
      で良かったのにまたやり直しかぁ~」という事がなくなります。
      私がやった例ではだいたい29epoch程度で自動終了します(要約1時間21分程度)。
      (i7-8700 3.2GHz 32MRAM GTX4080Super 16GB Windows11環境)
      最良点は19epochでした。(過学習の兆候が現れてから+10epoch様子を見る)
      途中で停止ボタンを押した場合でも、そこまでの「best_epock情報を保存していますのでpth
      やonnixモデルの書き出しが可能です。
      1epochに掛かる時間、学習開始からの経過時間、などの表示は、epochが終了時点で更新されます
      終了予想時間は、最大値の500epochに対する時間になります。
      グラフ上の縦の赤色破線は途中経過までの最大val accを表しています。
      val accが低下してくると警告として表示します。
    ● 学習損失、学習精度曲線グラフと各クラス別 val acc数値表と棒グラフを表示します。
      これにより、個々のデーターの問題点等が一目で分かるので不足と思われるクラスを見つけて修正
      再学習が容易になります。
    ● 過学習の兆候が始まってから引き続き10epoch実行を続けます。
      もう少し早く終わらせたい場合には、「#学習」の部分中のスクリプトpatience = 10  を変更して
      下さい。

    CNN空模様を学習するスクリプト
    (WindowsPC上の仮想Ubuntuドッカー上のJupyter notebookで動かします。)

    # 必要ライブラリ・コマンドの導入
    !pip install --upgrade pip
    !pip install onnx
    !pip install pandas
    !pip install japanize_matplotlib | tail -n 1
    !pip install torchviz | tail -n 1
    !pip install torchinfo | tail -n 1
    w = !apt install tree
    print(w[-2])
    
    # 必要ライブラリのインポート
    %matplotlib inline
    import time, os, signal
    import numpy as np
    import matplotlib.pyplot as plt
    import japanize_matplotlib
    import pandas as pd
    from tqdm import tqdm
    from IPython.display import clear_output
    
    # PyTorch関連ライブラリのインポート
    import torch
    from torch import tensor
    import torch.nn as nn
    import torch.optim as optim
    from torchinfo import summary
    from torchviz import make_dot
    import torchvision.transforms as transforms
    from torch.utils.data import Dataset, DataLoader
    import torchvision.datasets as datasets
    
    # warning表示off
    import warnings
    warnings.simplefilter('ignore')
    # デフォルトフォントサイズ変更
    plt.rcParams['font.size'] = 14
    # デフォルトグラフサイズ変更
    plt.rcParams['figure.figsize'] = (6,6)
    # デフォルトで方眼表示ON
    plt.rcParams['axes.grid'] = True
    # numpyの表示桁数設定
    np.set_printoptions(suppress=True, precision=5)
    
    # GPUチェック
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    print(device)
    
    # 共通関数のダウンロード
    !git clone https://github.com/makaishi2/pythonlibs.git
    # 共通関数のロード
    from pythonlibs.torch_lib1 import *
    # 共通関数の存在
    print(README)
    
    # 検証データ用 : 正規化のみ実施
    test_transform = transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize(0.5, 0.5)
    ])
    # 訓練データ用: 正規化に追加で反転とRandomErasingを実施
    train_transform = transforms.Compose([
        transforms.RandomResizedCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize(0.5, 0.5),
        transforms.RandomErasing(p=0.5, scale=(0.02, 0.33), ratio=(0.3, 3.3), value=0, inplace=False)
    ])
    
    # ツリーのベースディレクトリ
    data_dir = '/data/sun/learning_datasets'
    # 訓練データディレクトリと検証データディレクトリの指定
    import os
    train_dir = os.path.join(data_dir, 'train')
    test_dir = os.path.join(data_dir, 'test')
    # join関数の結果確認
    print(train_dir, test_dir)
    #本プロジェクト名
    project = 'sun'
    # 分類先クラスのリスト作成
    classes = ['BS', 'CB','CS']
    def fix_class_order(dataset, classes):
        dataset.class_to_idx = {cls: i for i, cls in enumerate(classes)}
        dataset.classes = classes
    
        # samples を作り直す
        new_samples = []
        for path, _ in dataset.samples:
            cls_name = os.path.basename(os.path.dirname(path))
            if cls_name in dataset.class_to_idx:
                new_samples.append((path, dataset.class_to_idx[cls_name]))
    
        dataset.samples = new_samples
        dataset.targets = [s[1] for s in new_samples]
        
    # データセット定義
    # 訓練用
    train_data = datasets.ImageFolder(train_dir, transform=train_transform)
    fix_class_order(train_data, classes)
    # 訓練データのイメージ表示用
    train_data2 = datasets.ImageFolder(train_dir, transform=test_transform)
    # 検証用
    test_data  = datasets.ImageFolder(test_dir,  transform=test_transform)
    
    # データ件数確認
    print(f'訓練データ: {len(train_data)}件')
    print(f'検証データ: {len(test_data)}件')
    
    #show_images_labels関数上書き
    def show_images_labels(dataloader, classes, model=None, device=None, max_images=20):
        model.eval() if model else None
    
        images, labels = next(iter(dataloader))
        images = images[:max_images]
        labels = labels[:max_images]
    
        if model:
            images_gpu = images.to(device)
            with torch.no_grad():
                outputs = model(images_gpu)
                _, preds = torch.max(outputs, 1)
            preds = preds.cpu()
        else:
            preds = None
    
        plt.figure(figsize=(15, 6))
    
        for i in range(len(images)):
            ax = plt.subplot(2, max_images // 2, i + 1)
    
            img = images[i].numpy().transpose(1, 2, 0)
            img = (img + 1) / 2
            plt.imshow(img)
            ax.axis("off")
    
            true_label = classes[labels[i]]
            ax.text(
                0.5, 1.05,
                f"GT: {true_label}",
                fontsize=9,
                color="black",
                ha="center",
                transform=ax.transAxes
            )
    
            if preds is not None:
                pred_label = classes[preds[i]]
                color = "black" if preds[i] == labels[i] else "red"
                ax.text(
                    0.5, -0.15,
                    f"Pred: {pred_label}",
                    fontsize=9,
                    color=color,
                    ha="center",
                    transform=ax.transAxes
                )
    
        plt.tight_layout()
        plt.show()
    
    # データローダー定義
    batch_size = 50
    # 訓練用
    train_loader = DataLoader(train_data, 
          batch_size=batch_size, shuffle=True)
    # 検証用
    test_loader = DataLoader(test_data, 
          batch_size=batch_size, shuffle=False)
    # イメージ表示用
    train_loader2 = DataLoader(train_data2, 
          batch_size=50, shuffle=True)
    test_loader2 = DataLoader(test_data, 
          batch_size=50, shuffle=True)
    
    # 検証用データ(50件)
    torch_seed()
    show_images_labels(test_loader2, classes, None, None)
    
    # ファインチューニング版
    
    # 学習済みモデルvgg19_bnをパラメータ付きで読み込む
    from torchvision import models
    net = models.vgg19_bn(pretrained = True)
    # 乱数初期化
    torch_seed()
    # 最終ノードの出力を24に変更する
    in_features = net.classifier[6].in_features
    net.classifier[6] = nn.Linear(in_features, len(classes))
    # AdaptiveAvgPool2d関数の取り外し
    net.avgpool = nn.Identity()
    # GPUの利用
    net = net.to(device)
    # 学習率
    lr = 0.001
    # 損失関数定義
    criterion = nn.CrossEntropyLoss()
    # 最適化関数定義
    optimizer = optim.SGD(net.parameters(),lr=lr,momentum=0.9)
    # historyファイルも同時に初期化する
    history = np.zeros((0, 5))
    
    print(train_data.classes)
    print(train_data.class_to_idx)
    
    # 関数定義
    #train_epoch関数
    def train_epoch(model, optimizer, criterion, dataloader, device):
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0
    
        pbar = tqdm(dataloader, desc="Train", leave=False)
    
        for images, labels in pbar:
            images = images.to(device)
            labels = labels.to(device)
    
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
    
            running_loss += loss.item() * images.size(0)
            _, preds = torch.max(outputs, 1)
            correct += (preds == labels).sum().item()
            total += labels.size(0)
    
            pbar.set_postfix(loss=loss.item())
    
        return running_loss / total, correct / total
    
    
    #eval_epoch関数
    def eval_epoch(model, criterion, dataloader, device):
        model.eval()
        running_loss = 0.0
        correct = 0
        total = 0
    
        pbar = tqdm(dataloader, desc="Val", leave=False)
    
        with torch.no_grad():
            for images, labels in pbar:
                images = images.to(device)
                labels = labels.to(device)
    
                outputs = model(images)
                loss = criterion(outputs, labels) 
    
                running_loss += loss.item() * images.size(0)
                _, preds = torch.max(outputs, 1)
                correct += (preds == labels).sum().item()
                total += labels.size(0)
    
                pbar.set_postfix(loss=loss.item())
    
        epoch_loss = running_loss / total
        epoch_acc = correct / total
    
        return epoch_loss, epoch_acc
    
    #学習
    # 表示設定
    pd.set_option('display.max_columns', None)
    # ====== 共通関数 ======
    def sec_to_hms(sec):
        sec = int(sec)
        h = sec // 3600
        m = (sec % 3600) // 60
        s = sec % 60
        return f"{h:02d}:{m:02d}:{s:02d}"
    
    # ====== 保存設定 ======
    SAVE_DIR = "./checkpoints"
    os.makedirs(SAVE_DIR, exist_ok=True)
    HISTORY_CSV = f"{SAVE_DIR}/history.csv"
    BEST_MODEL = f"{SAVE_DIR}/best_model.pth"
    
    # ====== 学習設定 ======
    max_epochs = 500
    patience = 10
    
    best_val_acc = 0.0
    best_epoch = 0
    best_state_dict = None
    no_improve_count = 0
    history = []
    
    train_start_time = time.time()
    epoch_times = []
    
    # ====== グラフ初期化(1回だけ)=====
    plt.ioff()
    fig, ax = plt.subplots(1, 2, figsize=(13, 4))
    display(fig)
    prev_val_acc = None
    
    try:
        for epoch in range(1, max_epochs + 1):
            epoch_start = time.time()
    
            train_loss, train_acc = train_epoch(
                net, optimizer, criterion, train_loader, device
            )
            val_loss, val_acc = eval_epoch(
                net, criterion, test_loader, device
            )
    
            epoch_time = time.time() - epoch_start
            epoch_times.append(epoch_time)
    
            elapsed = time.time() - train_start_time
            avg_epoch = sum(epoch_times) / len(epoch_times)
            eta = avg_epoch * (max_epochs - epoch)
    
            history.append([
                epoch,
                train_loss, train_acc,
                val_loss, val_acc,
                sec_to_hms(epoch_time),
                sec_to_hms(elapsed)
            ])
    
            # ===== 表示更新 =====
            clear_output(wait=True)
    
            print(f"Epoch {epoch}/{max_epochs}")
            print(f"  train_acc = {train_acc:.4f}  train_loss = {train_loss:.4f}")
            print(f"  val_acc   = {val_acc:.4f}  val_loss   = {val_loss:.4f}")
            print(f"  epoch時間(平均): {sec_to_hms(avg_epoch)}")
            print(f"  経過時間       : {sec_to_hms(elapsed)}")
            print(f"  終了予想時間   : {sec_to_hms(eta)}")
    
            df = pd.DataFrame(
                history,
                columns=[
                    "epoch",
                    "train_loss", "train_acc",
                    "val_loss", "val_acc",
                    "epoch_time", "elapsed_time"
                ]
            )
            display(df.tail(10))
    
            # ===== グラフ更新 =====
            ax[0].cla()
            ax[1].cla()
    
            ax[0].plot(df["epoch"], df["train_loss"], label="train_loss")
            ax[0].plot(df["epoch"], df["val_loss"], label="val_loss")
            ax[0].set_title("Loss")
            ax[0].grid()
    
            ax[1].plot(df["epoch"], df["train_acc"], label="train_acc")
            ax[1].plot(df["epoch"], df["val_acc"], label="val_acc")
            ax[1].set_title("Accuracy")
            ax[1].grid()
    
            if best_epoch > 0:
                ax[0].axvline(best_epoch, color="red", linestyle="--")
                ax[1].axvline(best_epoch, color="red", linestyle="--")
                ax[1].text(
                    best_epoch,
                    best_val_acc,
                    f"best={best_val_acc:.3f}",
                    color="red"
                )
    
            ax[0].legend()
            ax[1].legend()
            fig.canvas.draw_idle()
            display(fig)
    
            # ===== val_acc 低下警告 =====
            if prev_val_acc is not None and val_acc < prev_val_acc:
                print(" 警告: val_acc が低下しています")
    
            prev_val_acc = val_acc
    
            # ===== ベスト更新 =====
            if val_acc > best_val_acc:
                best_val_acc = val_acc
                best_epoch = epoch
                best_state_dict = net.state_dict()
                torch.save(best_state_dict, BEST_MODEL)
                no_improve_count = 0
            else:
                no_improve_count += 1
    
            df.to_csv(HISTORY_CSV, index=False)
    
            if no_improve_count >= patience:
                print("\n過学習の兆候を検出")
                print(f"最良 val_acc: {best_val_acc:.4f}")
                print(f"最良 epoch : {best_epoch}")
                break
    
    except KeyboardInterrupt:
        print("\n 学習を中断しました(Ctrl+C)")
    
    finally:
        if best_state_dict is not None:
            net.load_state_dict(best_state_dict)
            torch.save(best_state_dict, BEST_MODEL)
        df.to_csv(HISTORY_CSV, index=False)
        print(" 学習状態を保存しました")
    
    # 検証データへの結果表示
    torch_seed()
    show_images_labels(test_loader2, classes, net, device)
    
    # PyTorch 保存
    acc_percent = int(round(best_val_acc * 100))
    base_name = f"{project}_{acc_percent}"
    pth_name = f"{base_name}.pth"
    torch.save(net.state_dict(), pth_name)
    print(f"PyTorchモデル保存: {pth_name}")
    
    # ONNX 保存
    onnx_name = f"{base_name}.onnx"
    dummy_input = torch.randn(1, 3, 224, 224).to(device)
    
    torch.onnx.export(
        net,
        dummy_input,
        onnx_name,
        input_names=['input'],
        output_names=['output'],
        dynamic_axes={'input': {0: 'batch_size'},
                      'output': {0: 'batch_size'}},
        opset_version=11
    )
    print(f"ONNXモデル保存: {onnx_name}")
    
    #クラス別 accuracy を計算する関数
    def evaluate_per_class_accuracy(model, dataloader, classes, device):
        model.eval()
        
        num_classes = len(classes)
        correct = np.zeros(num_classes, dtype=int)
        total = np.zeros(num_classes, dtype=int)
    
        with torch.no_grad():
            for images, labels in dataloader:
                images = images.to(device)
                labels = labels.to(device)
    
                outputs = model(images)
                _, preds = torch.max(outputs, 1)
    
                for i in range(len(labels)):
                    label = labels[i].item()
                    pred = preds[i].item()
                    total[label] += 1
                    if pred == label:
                        correct[label] += 1
    
        acc = np.zeros(num_classes)
        for i in range(num_classes):
            if total[i] > 0:
                acc[i] = correct[i] / total[i] * 100
            else:
                acc[i] = np.nan  # データなし
    
        return acc, correct, total
    
    
    #検証データで評価
    per_class_acc, correct, total = evaluate_per_class_accuracy(
        net, test_loader, classes, device
    )
    
    for i, cls in enumerate(classes):
        print(f"{cls:15s} : {per_class_acc[i]:6.2f}% ({correct[i]}/{total[i]})")
    
    
    #グラフ表示
    import matplotlib.pyplot as plt
    
    plt.figure(figsize=(10, 6))
    bars = plt.bar(range(len(classes)), per_class_acc)
    plt.xticks(range(len(classes)), classes, rotation=90)
    plt.ylabel("Validation Accuracy (%)")
    plt.title("Class-wise Validation Accuracy")
    
    # 80%未満を赤に
    for i, acc in enumerate(per_class_acc):
        if not np.isnan(acc) and acc < 80:
            bars[i].set_color("red")

    (4) 空模様検知報告システムの運用
      ここからは、Raspberry Pi4やRaspberry Pi5で行う作業になります。
      Raspberry Pi3は、このようなAIシステムには使用できません、PythonもVer3.10以上が必須になり
      ます。
      Rapberry Piの環境設定がまだの場合には、以下の文を参考にして下さい。
        「RaspberryPiのインストール設定方法
      Tapoカメラは、屋外からRTSP通信でネットに繋がっていますから、Raspberry Piは室内に設置し
      て使用します。
      MQTTパブリッシュ機能で別に設置したNODE-REDサーバーに送信して空模様を報告します。
      NODE-REDは、Google HOMEへ必要に応じて送信し、Google HOMEは音声で告知してくれます。
      スクリプトの説明 (Raspberry Pi搭載のPython編集ソフト「Thonny」で動きます。
      model.load_state_dict(torch.load(“rtsp96.pth“, map_location=”cpu”))
      モデル名は先に学習で書き出したモデル名です、実際に書き出された名前にして下さい。
      また、このスクリプトが置かれたフォルダー内にモデルをコピーしておいて下さい。
      sys.path.insert(0,”/home/pi/SUN/lib/python3.13/site-packages”)
      Raspberry Pi5などの最新OSでは、Python3はバージョンが3.13でインストールされます。
      また、Python3.13の場合、「Thonny」の設定で以下のところを変更して下さい。
      Thonny右下にあるメニューをクリックして表示される画面を出してPython3の実行ファイル
      を/home/pi/SUN/bin/python3.13に変更して下さい。
      output_dir = ‘/home/pi/SUN/CAPTURE’
      MQTT送信をする以外に画像のキャプチャーもする際(検知能力の検証用あるいは再学習用)には
      このフォルダーに仕分けされて保存されます。
      使う場合には、save_image_with_sequence(preview, output_dir + current_sky + ‘/’, “image”)
      のコメントアウトを外して使用して下さい。(そのまま使用すると満杯になりますので注意)
      キャプチャー間隔は3分になっていますが、適当に変更して下さい。
      MQTT送信のトピック名は「sky_pat」になっています、「sky pattern」の略です。
      適当に変更して下さい。
      NODE-RED側からの命令で必要に応じてシャットダウン命令をMQTT経由で受けられます。
      トピックを「sky_pat」で「shutdown」を送信して下さい。
      別途設置された「照度センサー」からのMQTT送信信号を受信し暗くなったら空模様告知を停止
      するようになっています、また同時に時間も考慮してあります。
      current_lux = 2000 #照度計が設置されている場合にはコメントアウトする。
      照度センサーの設置がまだの場合の為に、現在の照度は2000lxであると設定してありますので使用
      する場合には、コメントアウトして下さい。
      RTSP_URL = “rtsp://tarou:1234567@192.168.2.100:554/stream1?rtsp transport = tcp” #仮の値です
      NODE-REDサーバー上のMQTTブローカーアドレス、IPアドレス、ポート番号、TapoのRTSP通信
      のID、パスワード、IPアドレスは自分のアドレスに書き換えて使用します。


      空模様検知報告システムにスクリプト(Pytorchモデル対応)
      (Raspberry Pi4またはPi5で使用します、Python3.11以降用 Raspberry Pi3では動きません)

    import os
    import sys
    sys.path.insert(1,"/home/pi/SUN/lib/python3.13/site-packages")
    import numpy as np
    import sys
    import time
    import random
    import torch
    from torchvision import transforms, models
    import cv2
    from PIL import Image
    import datetime
    import schedule
    import pytz
    from time import sleep
    import paho.mqtt.client as mqtt
    
    dt_now = datetime.datetime.now()
    delay = 1
    output_dir = '/home/pi/SUN/CAPTURE/'
    frame = []
    # ===== 送信制御フラグ =====
    allow_send = True        # MQTT送信許可
    current_lux = None       # 最新照度
    LUX_THRESHOLD = 300      # 閾値
    time_allow = False
    start_time = 7       #朝測定開始時間
    end_time = 17        #夜測定終了時間
    
    def on_message(client, userdata, msg):
        global allow_send, current_lux
    
        topic = msg.topic
        payload = msg.payload.decode()
    
        # --- シャットダウン ---
        if topic == "sun_pat" and payload == "shutdown":
            print("Shutdown command received")
            client.loop_stop()
            os.system("sudo shutdown -h now")
            return
    
        # --- 照度受信 ---
        if topic == "lux":
            try:
                current_lux = float(payload)
                current_lux = 2000            #照度計が設置されている場合にはコメントアウトする。
                if current_lux < LUX_THRESHOLD:
                    allow_send = False
                    print(f"Lux={current_lux} → 送信停止")
                else:
                    allow_send = True
                    print(f"Lux={current_lux} → 照度OK")
            except ValueError:
                print("lux payload parse error:", payload)
    
    def on_connect(client, userdata, flags, reason_code, properties):
        print(f"Connected mqtt broker {reason_code}")
        client.subscribe("lux")
        client.subscribe("sun_pat")
    
    client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
    client.on_connect = on_connect
    client.on_message = on_message
    client.connect("192.168.2.242", 1884, 60)
    client.loop_start()
    
    def publish(data):
        if not allow_send:
            print("送信停止中のため publish をスキップ")
            return
        client.publish('sky_pat', payload=data, qos=0, retain=False)
        print(f"send {data} to sky_pat")
    
    def save_image_with_sequence(image, directory, filename_prefix, start_num=1, padding=5):
        global current_sky
        
        if not os.path.exists(directory):
            os.makedirs(directory)
    
        num = start_num
        while True:
            # 現在の日時を取得
            now = datetime.datetime.now(pytz.timezone('Asia/Tokyo'))
            filename = str(now.year) + str(now.month) + str(now.day) + str(now.hour) + '-' + \
                       f"{str(num).zfill(padding)}.png"
    
            filepath = os.path.join(directory, filename)
            if not os.path.exists(filepath):
                cv2.imwrite(filepath, image)
                print(f"保存しました: {filepath}")
                break
            num += 1
            
    filename_prefix = "image"
    start_number = 1
    padding = 5
    
    # --- モデル定義 ---
    model = models.vgg19_bn(weights=None)
    model.classifier[6] = torch.nn.Linear(model.classifier[6].in_features, 3)
    model.avgpool = torch.nn.Identity()
    model.load_state_dict(torch.load("sun_96.pth", map_location="cpu"))
    model.eval()
    
    # --- 前処理(学習時と同じ) ---
    preprocess = transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.5, 0.5,0.5],std=[0.5,0.5,0.5])
    ])
    
    # --- TapoカメラURL ---
    RTSP_URL = "rtsp://tarou:1234567@192.168.1.100:554/stream1?rtsp transport = tcp" 
    
    classes = ["BS", "CB", "CS"]
    label_to_index = {label: idx for idx, label in enumerate(classes)}
    
    def task():
        global time_allow
    
        now = datetime.datetime.now(pytz.timezone('Asia/Tokyo'))
        hour = now.hour
    
        if hour < start_time or hour >= end_time:
            print(f"{hour}時夜間のため送信停止")
            time_allow = False
            return
        else:
            time_allow = True
    
        # ==== RTSP接続 ====
        cap = cv2.VideoCapture(RTSP_URL)
        if not cap.isOpened():
            print("RTSPストリームに接続できません", flush=True)
            sys.exit(1)
        # カメラ画像取得
        ret, frame = cap.read()
        if not ret:
            print("フレーム取得失敗", flush=True)
            sys.exit(1)
        # 表示用プレビュー
        preview = cv2.resize(frame,(224,224))
        cv2.imshow("Frame", preview)
        cv2.waitKey(1)
        frame = cv2.cvtColor(frame,cv2.COLOR_BGR2RGB)    
        img_pil = Image.fromarray(frame)
        img_tensor = preprocess(img_pil).unsqueeze(0)  # NCHWに変換
         # 推論
        with torch.no_grad():
            outputs = model(img_tensor)
            predicted = torch.argmax(outputs, dim=1).item()
            current_sky = classes[predicted]
            tenki = ''
            snd = ''
            if current_sky == 'CB':
                tenki = '快晴'
                snd = '1'
            elif current_sky == 'BS':
                tenki = '晴天'
                snd = '2'
            elif current_sky == 'CS':
                tenki = '曇天'
                snd = '3'
            print(f"[{time.strftime('%H:%M:%S')}] 現在の空模様:{tenki}")
            
            if not (allow_send and time_allow):
                print("送信条件未達(照度 or 時刻)")
                return
            
            #結果を送信
            publish(snd)
    
            # 保存先のディレクトリ
            save_image_with_sequence(preview, output_dir + current_sky + '/', "image")
    
    #スケジュール登録
    schedule.every(180).seconds.do(task)
    #イベント実行
    while True:
        if cv2.waitKey(delay) & 0xFF == ord('q'):
            break
        schedule.run_pending()
        sleep(1)
    
    

    空模様検知報告システムにスクリプト(ONNXモデル対応)
      (Raspberry Pi4またはPi5で使用します、Python3.11以降用 Raspberry Pi3では動きません)

    import time
    import os
    import sys
    sys.path.insert(1,"/home/pi/SUN/lib/python3.13/site-packages")
    import cv2
    import numpy as np
    import sys
    import datetime
    import time
    import schedule
    from time import sleep
    from PIL import Image
    import pytz
    import onnxruntime as ort
    from torchvision import transforms
    import paho.mqtt.client as mqtt
    dt_now = datetime.datetime.now()
    delay = 1
    output_dir = '/home/pi/SUN/CAPTURE/'
    # --- TapoカメラURL ---
    RTSP_URL = "rtsp://tarou:1234567@192.168.1.100:554/stream1?rtsp transport = tcp" 
    
    # --- モデル・クラス設定 ---
    ONNX_MODEL = "sun_96.onnx"  # ONNXモデルファイル
    CLASS_NAMES = ["BS", "CB", "CS"]  # 学習時のクラス順
    IMG_SIZE = (224, 224)
    # ===== 送信制御フラグ =====
    allow_send = True        # MQTT送信許可
    current_lux = None       # 最新照度
    LUX_THRESHOLD = 300      # 閾値
    time_allow = False
    start_time = 7       #朝測定開始時間
    end_time = 17        #夜測定終了時間
    
    # --- 前処理(学習時と同じ正規化設定にする) ---
    preprocess = transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
    ])
        
    def on_message(client, userdata, msg):
        global allow_send, current_lux
    
        topic = msg.topic
        payload = msg.payload.decode()
    
        # --- シャットダウン ---
        if topic == "sun_pat" and payload == "shutdown":
            print("Shutdown command received")
            client.loop_stop()
            os.system("sudo shutdown -h now")
            return
    
        # --- 照度受信 ---
        if topic == "lux":
            try:
                current_lux = float(payload)
                current_lux = 2000            #照度計が設置されている場合にはコメントアウトする。
                if current_lux < LUX_THRESHOLD:
                    allow_send = False
                    print(f"Lux={current_lux} → 送信停止")
                else:
                    allow_send = True
                    print(f"Lux={current_lux} → 照度OK")
            except ValueError:
                print("lux payload parse error:", payload)
    
    
    
    def on_connect(client, userdata, flags, reason_code, properties):
        print(f"Connected mqtt broker {reason_code}")
        client.subscribe("lux")
        client.subscribe("sun_pat")
        
    client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
    client.on_connect = on_connect
    client.on_message = on_message
    client.connect("192.168.2.242", 1884, 60)
    client.loop_start()
    
    def publish(data):
        if not allow_send:
            print("送信停止中のため publish をスキップ")
            return
        client.publish('sky_pat', payload=data, qos=0, retain=False)
        print(f"send {data} to sky_pat")
    
    def save_image_with_sequence(image, directory, filename_prefix, start_num=1, padding=5):
        global current_sky
        
        if not os.path.exists(directory):
            os.makedirs(directory)
    
        num = start_num
        while True:
            # 現在の日時を取得
            now = datetime.datetime.now(pytz.timezone('Asia/Tokyo'))
            filename = str(now.year) + str(now.month) + str(now.day) + str(now.hour) + '-' + \
                       f"{str(num).zfill(padding)}.png"
            filepath = os.path.join(directory, filename)
            if not os.path.exists(filepath):
                cv2.imwrite(filepath, image)
                print(f"保存しました: {filepath}")
                break
            num += 1
            
    filename_prefix = "image"
    start_number = 1
    padding = 5
    
    def task():
        global time_allow
    
        now = datetime.datetime.now(pytz.timezone('Asia/Tokyo'))
        hour = now.hour
    
        if hour < start_time or hour >= end_time:
            print(f"{hour}時夜間のため送信停止")
            time_allow = False
            return
        else:
            time_allow = True
            
        # ==== RTSP接続 ====
        cap = cv2.VideoCapture(RTSP_URL)
        if not cap.isOpened():
            print("RTSPストリームに接続できません", flush=True)
            sys.exit(1)
        # カメラ画像取得
        ret, frame = cap.read()
        if not ret:
            print("フレーム取得失敗", flush=True)
            sys.exit(1)
        
            # 表示用プレビュー
        preview = cv2.resize(frame,(224,224))
        cv2.imshow("Frame", preview)
        cv2.waitKey(1)
        
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    
        # 前処理
        pil_frame = Image.fromarray(frame_rgb)
        tensor_frame = preprocess(pil_frame).unsqueeze(0)  # (1,3,224,224)
        input_data = tensor_frame.numpy().astype(np.float32)  # ここで4次元に確定
    
        # --- ONNX 推論セッション ---
        session = ort.InferenceSession(ONNX_MODEL, providers=["CPUExecutionProvider"])
    
        # 推論
        ort_inputs = {session.get_inputs()[0].name: input_data}
        ort_outs = session.run(None, ort_inputs)
        predicted = np.argmax(ort_outs[0])
        current_sky = CLASS_NAMES[predicted]
        tenki = ''
        snd = ''
        if current_sky == 'CB':
            tenki = '快晴'
            snd = '1'
        elif current_sky == 'BS':
            tenki = '晴天'
            snd = '2'
        elif current_sky == 'CS':
            tenki = '曇天'
            snd = '3'
        print(f"[{time.strftime('%H:%M:%S')}] 現在の空模様:{tenki}")
        if not (allow_send and time_allow):
            print("送信条件未達(照度 or 時刻)")
            return
        #結果を送信
        publish(snd)
    
        # 保存先のディレクトリ
        save_image_with_sequence(preview, output_dir + current_sky + '/', "image")
        
    #スケジュール登録
    schedule.every(180).seconds.do(task)
    
    #イベント実行
    while True:
        if cv2.waitKey(delay) & 0xFF == ord('q'):
            break
        schedule.run_pending()
        sleep(1)
    
    

    (5) 今後の課題について
      先に製作実験したK値による予測よりは、大分良くなりましたが、夕方時の色彩が無くなった頃
      の予測に不満があります。
      光があれば安価なカメラでも問題は無いのですが、光が無くなると問題が多いようです。
      その辺の事柄を分析して更に正確な予測が出来るよう改良版を作ろうと考えております。
    (生きていれば)