
    Wj4                     n   d dl Z d dlmZ d dlmZ d dlZd dlmZ d dl	m
Z
 d dlmZ d dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dlmZmZmZmZmZmZmZ d dlm Z m!Z! d dl"m#Z#m$Z$ d dl%m&Z& d dl'm(Z( d dl)m*Z*m+Z+m,Z, d dl-m.Z. d dl/m0Z0 d dl1m2Z2 d dl3m4Z4 e5e6e7ee8         dz  ee8         f         f         Z9dgZ:d+de8de6de6fdZ;	 d,dej<        dz  defdZ=dej>        de?fdZ@	 d+ded ee8         de6dej>        fd!ZAd"ede7e9ej<        dz  f         fd#ZB G d$ d%e          ZC	 d,d&ed'e6d(e(d)e!dz  def
d*ZDdS )-    N)Sequence)cast)_get_device_module)ShardedTensor)TensorProperties)Shard)ChunkShardingSpec)unflatten_state_dict)DefaultLoadPlanner)BytesStorageMetadataChunkStorageMetadataMetadataMetadataIndexSTATE_DICT_TYPEr   TensorStorageMetadata)LoadPlanLoadPlanner)_create_read_items create_read_items_for_chunk_list)load_state_dict)StorageReader)_element_wise_add_element_wise_sub_normalize_device_info)_get_default_group)_create_chunk_sharded_tensor)_remote_device)DTensor!load_sharded_optimizer_state_dictcudaglobal_rankdevice_typereturnc                     |dk    rdS t          |          }|                                r%t          || |                                z            S dS )Ncpu)r   is_availabler   device_count)r!   r"   device_modules      k/home/longshao/multi-rider-rag/.venv/lib/python3.11/site-packages/torch/distributed/checkpoint/optimizer.py_gen_rank_devicer*   8   sb    eu&{33M!!## 
%}'A'A'C'CC
 
 	
 5    pgc                 v    t           j                                       j         -fdt	          t          j                              D             }n. fdt	                                                     D             }t          dt          t          t          t          z           |                    S )Nc           	      <    g | ]}d | dt          |           S rank:/)r*   ).0idxpg_device_types     r)   
<listcomp>z(_create_colwise_spec.<locals>.<listcomp>H   sE     
 
 
 BCAA*3??AA
 
 
r+   c                 b    g | ]+}d | dt          t          j        |                     ,S r/   )r*   distget_global_rank)r2   r3   r,   r4   s     r)   r5   z(_create_colwise_spec.<locals>.<listcomp>M   sR     
 
 
 \C[[*4+?C+H+H.YY[[
 
 
r+   r   dim
placements)r7   distributed_c10d_get_pg_default_devicetyperangeget_world_sizesizer	   r   listr   str)r,   r;   r4   s   ` @r)   _create_colwise_specrD   C   s     *AA"EEJN	z
 
 
 
T02233
 
 




 
 
 
 
RWWYY''
 
 

 ^c12J??   r+   valc                 &   t          |           t          u rt          |                                           dk    rdS t          |                                 d         j                  t          u rdS t          |                                 d         j                  t
          u rt          d          n[t          |           t
          u rEt          | j                  t
          u st          | j                  t          u rt          d          dS )Nr   FTz1Cannot handle DTensor nested inside ShardedTensorzCannot handle nested DTensor)r>   r   lenlocal_shardstensorr   
ValueError_local_tensor)rE   s    r)   _is_nested_tensorrL   W   s    CyyM!!s!!""a''5  ""1%,-->>4  ""1%,--88PQQQ 9	cg		S7**d33D.E.E.V.V78885r+   propsrA   c                 F   |dk    r:t          t          j        t          |                                                    }n4t          j        |t          |                                                    }t          j        || j        | j        | j        | j	        |          S )Nr%   )rA   dtypelayoutrequires_grad
pin_memorydevice)
r   torchrS   r   current_deviceemptyrO   rP   rQ   rR   )rM   rA   r"   rS   s       r)   _alloc_tensorrW   f   s     eel$6{$C$C$R$R$T$TUU+K88GGII
 
 ;k|)#   r+   
state_dictc                    i }d}|                                  D ]\  }}d|                                f||<   t          |          rt          |                                          dk    st          d          t          |t                    st          d          |                                d         }|j        j	        |j        j
        f||<   |j        j        }||fS )a+  
    Load the right TP slice of the optimizer state.

    This is not easy since the per-tensor slicing can't be inferred from checkpoint metadata.
    We take advantage of the model state_dict producing a sliced ST to figure out what we need to load.
    This is pretty fragile and it might be easier for FSDP to compute this info for us.
    Returns a dictionary where keys are the same of the state_dict and the value is a tuple of
    (offset, size) for the current rank TP slice.
    N.B. The state_dict *MUST* come from FSDP.sharded_state_dict.
    N   z%Cannot handle ST with multiple shardsz$Can only handle nested ShardedTensorr   )itemsrA   rL   rG   rH   AssertionError
isinstancer   metadatashard_offsetsshard_sizesrI   _process_group)rX   specsdp_pgkeyvalueshards         r)   _get_state_dict_2d_layoutrg   z   s     #%E&*E &&(( 0 0
UEJJLL)c
U## 
	0u))++,,11$%LMMMe]33 M$%KLLL&&((+E,*E#J L/E 	 r+   c                        e Zd ZU eeef         ed<   eed<   eed<   deee	e
         f         ddf fdZdefdZd	edej        f fd
Z xZS )_ReaderWithOffsettranslationrX   r^   fqn_to_offsetr#   Nc                     t                                                       || _        t          i           | _        i | _        i | _        d S N)super__init__rk   r   r^   rX   rj   )selfrk   	__class__s     r)   ro   z_ReaderWithOffset.__init__   sC    * r+   c           	         g }i | _         | j                                        D ]\  }}| j        j        |         }t          |t                    s|t          |||          z  }B|| j        vr|t          |||          z  }`| j        |         }t          |
                                          dk    st          d          |
                                d         }t          t          j        t          |j        j        |                    t          j        |j        j                            g}t%          |t'          t(          |          |          }|D ]s}	|	j        j        t          d          t/          |	j        j        |          }
t1          j        |	j        t          j        |
                    }|| j         |	j        <   t||z  }t5          |          S )NrZ   z Expected exactly one local shardr   )offsetssizesz"dest_index.offset must not be None)offset)rj   rX   r[   r^   state_dict_metadatar]   r   r   rk   rG   rH   r\   r   rT   Sizer   r_   r`   r   r   r   
dest_indexru   r   dataclassesreplacer   )rp   requestsfqnobjmdru   original_shardlocal_chunksreqsrioriginal_offsetoriginal_indexs               r)   create_local_planz#_ReaderWithOffset.create_local_plan   s   --// &	 &	HC237Bc=11 .sB<<<$,,,.sB<<<',Fs''))**a//$%GHHH --//2N$!J).*A*OQWXX   *^%<%HII	  L 4T/44l D
  A A='/()MNNN"3BM4H&"Q"Q!,!4M%*_*E*E" " " 3A //HH!!!r+   indexc                 x    t                                          | j                            ||                    S rm   )rn   lookup_tensorrj   get)rp   r   rq   s     r)   r   z_ReaderWithOffset.lookup_tensor   s.    ww$$T%5%9%9%%G%GHHHr+   )__name__
__module____qualname__dictr   __annotations__r   r   rC   r   intro   r   r   rT   Tensorr   __classcell__)rq   s   @r)   ri   ri      s         m]23333d3+=&> 4      *"8 *" *" *" *"XI= IU\ I I I I I I I I I Ir+   ri   model_state_dictoptimizer_keystorage_readerplannerc                 R   |                                 }t          |           \  }}t          j                            |          j        }t          |          }|wg }	t          t          j                              D ]B}
t          ||
|
                                z            }|	                    d|
 d|            Ct          d|	          }nt          |          }i }i }|j                                        D ]n\  }}|j        |         }|d         |k    r t#          |t$                    rd||<   ;|j                                        dk    rt+          |j        |j        |          ||<   w|qt/          t+          |j        |j        |          t          j                    t          j                    |
                                t3                                ||<   |d	         }|                    |d|j        f          d         }t7          |j        j        |j        j        |j        j        |j        j        |j        j         
          }|!                    tE          j#        |          |          }g }t          j        |          }|j$        D ]p}tK          tL          |j'                  (                                |k    r3|                    tS          t+          |j        |j*        |          |                     qtW          j,        |||          }||v r=||         d         /tK          tZ          t\                   ||         d                   ||<   |||<   pt_          |||ta          |          n|           tc          ||j                  }|S )a  
    Load a state_dict in conjunction with FSDP sharded optimizer state.

    This is the current recommended way to checkpoint FSDP.
    >>> # xdoctest: +SKIP
    >>> import torch.distributed.checkpoint as dist_cp
    >>> # Save
    >>> model: torch.nn.Model
    >>> optim_params = model.parameters()
    >>> optim = torch.optim.SGD(optim_params, lr=0.01)
    >>> # Save
    >>> with FSDP.state_dict_type(model, StateDictType.SHARDED_STATE_DICT):
    >>>     state_dict = {
    >>>         "optimizer": FSDP.optim_state_dict(model, optim),
    >>>         "model": model.state_dict()
    >>>     }
    >>>     dist_cp.save_state_dict(
    >>>         state_dict=optim_state,
    >>>         storage_writer=dist_cp.FileSystemWriter("checkpoint"),
    >>>         planner=dist_cp.DefaultSavePlanner(),
    >>>     )
    >>>
    >>> # Load
    >>> with FSDP.state_dict_type(model_tp, StateDictType.SHARDED_STATE_DICT):
    >>>     model_state_dict = model_tp.state_dict()
    >>>     checkpoint = {
    >>>         "model": model_state_dict
    >>>     }
    >>>     dist_cp.load_state_dict(
    >>>         state_dict=checkpoint,
    >>>         storage_reader=dist_cp.FileSystemReader(checkpoint_file),
    >>>         planner=dist_cp.DefaultLoadPlanner(),
    >>>     )
    >>>     model.load_state_dict(checkpoint["model_state"])
    >>>
    >>>     optim_state = dist_cp.load_sharded_optimizer_state_dict(
    >>>         model_state_dict,
    >>>         optimizer_key="optimizer",
    >>>         storage_reader=dist_cp.FileSystemReader("checkpoint"),
    >>>     )
    >>>
    >>>     flattened_osd = FSDP.optim_state_dict_to_load(
    >>>        model, optim, optim_state["optimizer"]
    >>>     )
    >>>
    >>>     optim.load_state_dict(flattened_osd)
    Nr0   r1   r   r9   z
<bytes_io>rZ   )rank
world_sizenum_devices_per_noder,      )rO   rP   rQ   memory_formatrR   )rI   r^   )process_group)rX   r   r   )2read_metadatarg   r7   r<   r=   r>   r   r?   r@   r   r'   appendr	   rD   rv   r[   planner_datar]   r   rA   numelrW   
propertiesr   get_rankr   r   ShardTensorPropertiesrO   rP   rQ   r   rR   build_metadatarT   rw   shards_metadatar   r   	placementr   r   r`   r   +_init_from_local_shards_and_global_metadatar   r   r   ri   r
   )r   r   r   r   r^   layout_specsrc   dp_pg_device_typer(   r;   idevice_infosharding_specrX   rk   rd   re   key_pathspec_key
alloc_sizer   st_mdrH   current_rankshard_mdsts                             r)   r   r      s   j ++--H34DEEL%-DDUKKP&'899M}
t*,,-- 	9 	9A0!1}'A'A'C'C#C K 7a77+778888)aJGGG,U33 #%J.0M288:: 8! 8!
U(-A;-''e122 	*JsO :""+ %*.? JsOO ]:e.
<MNN]__.00%2%?%?%A%A%''  JsOO  {H%))(T5:4FGGJJ.&,'.#.<#.< +6  J "00J1G1GTTEL=//L!1 
 
(:;;@@BBlRR##,!,h.BDU    "*	      Je5  B <''L,B1,E,Q%)(3-h9OPQ9R%S%Sc" JsOO %494E!-0007	    &j(2GHHJr+   )r    rm   )Ery   collections.abcr   typingr   rT   torch.distributeddistributedr7   torch._utilsr   +torch.distributed._shard.sharded_tensor.apir   0torch.distributed._shard.sharded_tensor.metadatar   r   -torch.distributed._shard.sharded_tensor.shardr   :torch.distributed._shard.sharding_spec.chunk_sharding_specr	   )torch.distributed.checkpoint._nested_dictr
   ,torch.distributed.checkpoint.default_plannerr   %torch.distributed.checkpoint.metadatar   r   r   r   r   r   $torch.distributed.checkpoint.plannerr   r   ,torch.distributed.checkpoint.planner_helpersr   r   .torch.distributed.checkpoint.state_dict_loaderr   $torch.distributed.checkpoint.storager   "torch.distributed.checkpoint.utilsr   r   r   "torch.distributed.distributed_c10dr   #torch.distributed.fsdp._shard_utilsr   torch.distributed.remote_devicer   torch.distributed.tensorr   r   rC   tupler   STATE_DICT_2D_LAYOUT__all__r*   ProcessGrouprD   r   boolrL   rW   rg   ri   r    r+   r)   <module>r      s       $ $ $ $ $ $                    + + + + + + E E E E E E      @ ? ? ? ? ? X X X X X X J J J J J J K K K K K K                  G F F F F F F F        K J J J J J > > > > > >         
 B A A A A A L L L L L L : : : : : : , , , , , , Cx}t';Xc]'J!KKL 
 (
 # C S     $( D    (5< D      FL #+C=?B
\   (  
!2T!99:       F:I :I :I :I :I* :I :I :IB #'	N N%NN "N 4	N
 N N N N N Nr+   