
    j#                       d dl mZ d dlZd dlZd dlZd dlZd dlmZ d dlm	Z	 d dl
mZmZmZ d dlmZ d dlmZ d dlmZ d d	lmZmZ d d
lmZ d dlmZ  ej        e          5  d dlmZ ddd           n# 1 swxY w Y   erd dlZ d dl!Z d dl"Z#d dlm$Z$ d dl%m&Z&  ed           G d d                      Z' G d de          Z(dS )    )annotationsN)	dataclass)perf_counter)TYPE_CHECKINGClassVarLiteral)eprint)wrap_ldf)NoPickleOption)IcebergCatalogConfig0_convert_iceberg_to_object_store_storage_options)#_normalize_windows_iceberg_file_uri)_InternalPlPathProviderConfig)gen_uuid_v7)PyLazyFrame)StorageOptionsDictT)kw_onlyc                      e Zd ZU ded<   ded<   ded<   ded<   ded<   ded	<   d
ed<   ded<   ded<   ded<   eddddd*d            Zd+dZd,dZd-d!Zd.d$Z	d/d(Z
d0d)ZdS )1IcebergSinkStatestrpy_catalog_class_modulepy_catalog_class_qualnamecatalog_namedict[str, str]catalog_properties
table_nameLiteral['append', 'overwrite']moder   iceberg_storage_propertiessink_uuid_strz%NoPickleOption[pyiceberg.table.Table]table_zNoPickleOption[pl.DataFrame]commit_result_dfappendN)r   catalogstorage_optionstargetstr | pyiceberg.table.Tabler$   7pyiceberg.catalog.Catalog | IcebergCatalogConfig | Noner%   StorageOptionsDict | Nonereturnc                  t          | t                    rt          j        |d          n7t          t	          | j                  | j        j        | j        j                  }ddlm	} |j
        |u r3dt	          |           dt          | dd           }t          |          t          |j
        j        |j
        j        |j        |j        t          | t                    r| n&d	                    |                                           ||pi t#                                                      t'          t          | t                    s| nd           t'                      

  
        S )Nsink_iceberg)fn_name)class_name
propertiesr   )NoopCatalogz4cannot sink to static Iceberg table: type(target) = z%, getattr(target, 'catalog', None) = r$   .)
r   r   r   r   r   r   r   r    r!   r"   )
isinstancer   r   *_from_api_parameter_or_environment_defaulttyper$   r/   r0   pyiceberg.catalog.noopr1   r.   getattr	TypeErrorr   
__module____qualname__joinr   hexr   )r&   r   r$   r%   catalog_configr1   msgs          \/home/longshao/multi-rider-rag/.venv/lib/python3.11/site-packages/polars/io/iceberg/_sink.pynewzIcebergSinkState.new2   s{    &#&&$O*    %//,%~8   	" 	766666 K//L<<L L&-fi&F&FL L  C.. $2$9$D&4&;&H',-8!+FC!8!8Uvvchhv{{}}>U>U'6'<"%--++--!
630G0G"Q&&TRR+--
 
 
 	
    pyiceberg.table.Tablec                   | j                                         t          j        | j                  }| j                            d          }t          ||d                   }|dd          D ]}t          ||          } || j        fi | j	        }| j         
                    |                    | j                             | j                                         S )Nr2   r      )r!   get	importlibimport_moduler   r   splitr7   r   r   set
load_tabler   )selfmodulequalname_splitcatalog_classpartr$   s         r?   tablezIcebergSinkState.tablea   s    ;??$,T-IJJF!;AA#FFN=Dq)> >M 'qrr* = = 't < <#mD$5QQ9PQQGKOOG..t??@@@{   rA   c                *    t          | j                  S N)r   r   )rK   s    r?   _get_converted_storage_optionsz/IcebergSinkState._get_converted_storage_optionsr   s    ?+
 
 	
rA   lfpl.LazyFramec                P    t          |j                            |                     S rR   )r
   _ldfr,   )rK   rT   s     r?   attach_sinkzIcebergSinkState.attach_sinkw   s     ,,T22333rA   plfr   c                d   ddl m} ddlm}m} dd l}|                                 }|j        }|j        }|	                                j
        rd}	t          |	          |                                j
        rd}	t          |	          |                    |j                  x}
rd|
 d}	t          |	           |||j        d          rd	|j         d}	t          |	          dd
lm}  ||                                          }d} ||j        |j                  x}rd}t)          ||z  d          }t+          |                               |j        t1          |                                           t5                      |          ||                                 d          j        S )Nr   TableProperties)property_as_boolproperty_as_intz!sink to partitioned Iceberg tablez%sink to Iceberg table with sort orderz5sink to Iceberg table with custom location provider ''Fzsink to Iceberg table with ')schema_to_pyarrowl        )r0   property_name   l    )file_path_providerapproximate_bytes_per_fileT)arrow_schemar%   lazy)pyiceberg.tabler\   pyiceberg.utils.propertiesr]   r^   polarsrP   metadatar0   specfieldsNotImplementedError
sort_orderrE   WRITE_PY_LOCATION_PROVIDER_IMPLOBJECT_STORE_ENABLEDpyiceberg.io.pyarrowr`   schemaWRITE_TARGET_FILE_SIZE_BYTESminr
   sink_parquetPartitionByr   output_base_pathPlIcebergPathProviderConfigrS   rW   )rK   rY   r\   r]   r^   plrP   table_metadatatable_propertiesr>   location_provider_implr`   re   rd   vestimated_compression_ratios                   r?   _attach_resolved_sinkz&IcebergSinkState._attach_resolved_sinkz   s1   333333PPPPPPPP

)4::<< 	+5C%c***$ 	+9C%c***%5%9%9;&
 &
 
! 	+/+/ / /  &c***oBE
 
 	+ Y1UXXXC%c***::::::((88%;"%0)F
 
 
 
1 	 +,'),+a/* *&
 SMM\78M8M8O8OPP'B'D'D/I  
 * $ C C E E  	 	 	
rA   data_file_paths	list[str]pl.DataFramec                   dd l }dd l}t                      }|j        j                                        }|rt          d| j         d           |                                 }|j	        }t          j        dk    rd |D             }|                                5 }| j        dk    r#ddlm}	 |                     |	                       |rt          d           t                      }
|                    |d	
           |r4t                      |
z
  }t          d|dd           t          d           t                      }
d d d            n# 1 swxY w Y   |r't                      }||
z
  }t          d|dd           |j	        }||k    sJ | j                             |j        d|id|j        id                     |r||z
  }t          d|dd           | j                                        S )Nr   z!IcebergSinkState[commit]: mode: 'r_   win32c                V    g | ]&}|                     d           rd|dd          n|'S )zfile:///zfile://   N)
startswith).0ps     r?   
<listcomp>z+IcebergSinkState.commit.<locals>.<listcomp>   sO        '(ll:&>&>E"1QRR5"""A  rA   	overwrite)
AlwaysTruez)IcebergSinkState[commit]: begin add_filesF)check_duplicate_filesz,IcebergSinkState[commit]: finish add_files (z.3fzs)z2IcebergSinkState[commit]: begin transaction commitz5IcebergSinkState[commit]: finish transaction commit (metadata_pathrD   )rr   heightz8IcebergSinkState[commit]: finished, total elapsed time: s)ri   polars._utils.loggingr   _utilsloggingverboser	   r   rP   metadata_locationsysplatformtransactionpyiceberg.expressionsr   delete	add_filesr"   rI   	DataFrameStringrE   )rK   r   ry   ri   function_start_instantr   rP   original_metadata_locationtxr   start_instantelapsednownew_metadata_locationtotal_elapseds                  r?   commitzIcebergSinkState.commit   s   $$$$!--'//11 	ECtyCCCDDD

%*%<"<7"" (  O
    	+ByK''<<<<<<		**,,''' DBCCC(NNMLL&+    
  M&..=8UgUUUUVVVKLLL(NNM+	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+.  	..CM)GWWWWW   !& 7$(BBBBB!!BL "78'3  	
 	
 	
  	"88M_=____   $((***s   B)EEEc                   ddl m} |                                 }|j        }|j        }|                    |j                  x}r|                    d          n|j                            d           d}| d| j	         dS )Nr   r[   /z/data)
rg   r\   rP   rj   r0   rE   WRITE_DATA_PATHrstriplocationr    )rK   r\   rP   rz   r{   pathrw   s          r?   rw   z!IcebergSinkState.output_base_path   s    333333

)4 ),,_-LMMM?DKK"+22377>>> 	 #::T%7::::rA   )
r&   r'   r   r   r$   r(   r%   r)   r*   r   )r*   rB   )r*   r   )rT   rU   r*   rU   )rY   r   r*   r   )r   r   r*   r   )r*   r   )__name__r9   r:   __annotations__staticmethodr@   rP   rS   rX   r   r   rw    rA   r?   r   r   !   s=            """"&&&&OOO((((222211112222 08KO59,
 ,
 ,
 ,
 ,
 \,
\! ! ! !"
 
 
 

4 4 4 4=
 =
 =
 =
~E+ E+ E+ E+N; ; ; ; ; ;rA   r   c                  ,    e Zd ZU dZded<   dZded<   dS )rx   icebergzClassVar[str]pl_path_provider_idparquetzClassVar[Literal['parquet']]	extensionN)r   r9   r:   r   r   r   r   rA   r?   rx   rx     s5         )22222.7I777777rA   rx   ))
__future__r   
contextlibrF   importlib.utilr   dataclassesr   timer   typingr   r   r   r   r	   polars._utils.wrapr
   polars.io.cloud._utilsr   polars.io.iceberg._datasetr   r   polars.io.iceberg._utilsr   polars.io.partitionr   suppressImportErrorpolars._plrr   pyiceberg.catalog	pyicebergrg   ri   ry   r   polars._typingr   r   rx   r   rA   r?   <module>r      sW   " " " " " "             



 ! ! ! ! ! !       3 3 3 3 3 3 3 3 3 3 ( ( ( ( ( ( ' ' ' ' ' ' 1 1 1 1 1 1        I H H H H H = = = = = =Z%% ( (''''''( ( ( ( ( ( ( ( ( ( ( ( ( ( (  2''''''111111 4k; k; k; k; k; k; k; k;\8 8 8 8 8"? 8 8 8 8 8s   #A66A:=A: